operator/
expr_helper.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(feature = "enterprise")]
16pub mod trigger;
17
18use std::collections::{HashMap, HashSet};
19
20use api::helper::ColumnDataTypeWrapper;
21use api::v1::alter_database_expr::Kind as AlterDatabaseKind;
22use api::v1::alter_table_expr::Kind as AlterTableKind;
23use api::v1::column_def::{options_from_column_schema, try_as_column_schema};
24use api::v1::{
25    AddColumn, AddColumns, AlterDatabaseExpr, AlterTableExpr, Analyzer, ColumnDataType,
26    ColumnDataTypeExtension, CreateFlowExpr, CreateTableExpr, CreateViewExpr, DropColumn,
27    DropColumns, DropDefaults, ExpireAfter, FulltextBackend as PbFulltextBackend, ModifyColumnType,
28    ModifyColumnTypes, RenameTable, SemanticType, SetDatabaseOptions, SetDefaults, SetFulltext,
29    SetIndex, SetIndexes, SetInverted, SetSkipping, SetTableOptions,
30    SkippingIndexType as PbSkippingIndexType, TableName, UnsetDatabaseOptions, UnsetFulltext,
31    UnsetIndex, UnsetIndexes, UnsetInverted, UnsetSkipping, UnsetTableOptions, set_index,
32    unset_index,
33};
34use common_error::ext::BoxedError;
35use common_grpc_expr::util::ColumnExpr;
36use common_time::Timezone;
37use datafusion::sql::planner::object_name_to_table_reference;
38use datatypes::schema::{
39    COLUMN_FULLTEXT_OPT_KEY_ANALYZER, COLUMN_FULLTEXT_OPT_KEY_BACKEND,
40    COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE,
41    COLUMN_FULLTEXT_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE,
42    COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY,
43    ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema,
44    SkippingIndexType,
45};
46use file_engine::FileOptions;
47use query::sql::{
48    check_file_to_table_schema_compatibility, file_column_schemas_to_table,
49    infer_file_table_schema, prepare_file_table_files,
50};
51use session::context::QueryContextRef;
52use session::table_name::table_idents_to_full_name;
53use snafu::{OptionExt, ResultExt, ensure};
54use sql::ast::{
55    ColumnDef, ColumnOption, ColumnOptionDef, Expr, Ident, ObjectName, ObjectNamePartExt,
56};
57use sql::dialect::GreptimeDbDialect;
58use sql::parser::ParserContext;
59use sql::statements::alter::{
60    AlterDatabase, AlterDatabaseOperation, AlterTable, AlterTableOperation,
61};
62use sql::statements::create::{
63    Column as SqlColumn, ColumnExtensions, CreateExternalTable, CreateFlow, CreateTable,
64    CreateView, TableConstraint,
65};
66use sql::statements::{
67    OptionMap, column_to_schema, concrete_data_type_to_sql_data_type,
68    sql_column_def_to_grpc_column_def, sql_data_type_to_concrete_data_type, value_to_sql_value,
69};
70use sql::util::extract_tables_from_query;
71use store_api::mito_engine_options::{COMPACTION_OVERRIDE, COMPACTION_TYPE};
72use table::requests::{FILE_TABLE_META_KEY, TableOptions};
73use table::table_reference::TableReference;
74#[cfg(feature = "enterprise")]
75pub use trigger::to_create_trigger_task_expr;
76
77use crate::error::{
78    BuildCreateExprOnInsertionSnafu, ColumnDataTypeSnafu, ConvertColumnDefaultConstraintSnafu,
79    ConvertIdentifierSnafu, EncodeJsonSnafu, ExternalSnafu, FindNewColumnsOnInsertionSnafu,
80    IllegalPrimaryKeysDefSnafu, InferFileTableSchemaSnafu, InvalidColumnDefSnafu,
81    InvalidFlowNameSnafu, InvalidSqlSnafu, NotSupportedSnafu, ParseSqlSnafu, ParseSqlValueSnafu,
82    PrepareFileTableSnafu, Result, SchemaIncompatibleSnafu, UnrecognizedTableOptionSnafu,
83};
84
85pub fn create_table_expr_by_column_schemas(
86    table_name: &TableReference<'_>,
87    column_schemas: &[api::v1::ColumnSchema],
88    engine: &str,
89    desc: Option<&str>,
90) -> Result<CreateTableExpr> {
91    let column_exprs = ColumnExpr::from_column_schemas(column_schemas);
92    let expr = common_grpc_expr::util::build_create_table_expr(
93        None,
94        table_name,
95        column_exprs,
96        engine,
97        desc.unwrap_or("Created on insertion"),
98    )
99    .context(BuildCreateExprOnInsertionSnafu)?;
100
101    validate_create_expr(&expr)?;
102    Ok(expr)
103}
104
105pub fn extract_add_columns_expr(
106    schema: &Schema,
107    column_exprs: Vec<ColumnExpr>,
108) -> Result<Option<AddColumns>> {
109    let add_columns = common_grpc_expr::util::extract_new_columns(schema, column_exprs)
110        .context(FindNewColumnsOnInsertionSnafu)?;
111    if let Some(add_columns) = &add_columns {
112        validate_add_columns_expr(add_columns)?;
113    }
114    Ok(add_columns)
115}
116
117//   cpu float64,
118//   memory float64,
119//   TIME INDEX (ts),
120//   PRIMARY KEY(host)
121// ) WITH (location='/var/data/city.csv', format='csv');
122// ```
123// The user needs to specify the TIME INDEX column. If there is no suitable
124// column in the file to use as TIME INDEX, an additional placeholder column
125// needs to be created as the TIME INDEX, and a `DEFAULT <value>` constraint
126// should be added.
127//
128//
129// When the `CREATE EXTERNAL TABLE` statement is in inferred form, like
130// ```sql
131// CREATE EXTERNAL TABLE IF NOT EXISTS city WITH (location='/var/data/city.csv',format='csv');
132// ```
133// 1. If the TIME INDEX column can be inferred from metadata, use that column
134//    as the TIME INDEX. Otherwise,
135// 2. If a column named `greptime_timestamp` exists (with the requirement that
136//    the column is with type TIMESTAMP, otherwise an error is thrown), use
137//    that column as the TIME INDEX. Otherwise,
138// 3. Automatically create the `greptime_timestamp` column and add a `DEFAULT 0`
139//    constraint.
140pub(crate) async fn create_external_expr(
141    create: CreateExternalTable,
142    query_ctx: &QueryContextRef,
143) -> Result<CreateTableExpr> {
144    let (catalog_name, schema_name, table_name) =
145        table_idents_to_full_name(&create.name, query_ctx)
146            .map_err(BoxedError::new)
147            .context(ExternalSnafu)?;
148
149    let mut table_options = create.options.into_map();
150
151    let (object_store, files) = prepare_file_table_files(&table_options)
152        .await
153        .context(PrepareFileTableSnafu)?;
154
155    let file_column_schemas = infer_file_table_schema(&object_store, &files, &table_options)
156        .await
157        .context(InferFileTableSchemaSnafu)?
158        .column_schemas;
159
160    let (time_index, primary_keys, table_column_schemas) = if !create.columns.is_empty() {
161        // expanded form
162        let time_index = find_time_index(&create.constraints)?;
163        let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
164        let column_schemas =
165            columns_to_column_schemas(&create.columns, &time_index, Some(&query_ctx.timezone()))?;
166        (time_index, primary_keys, column_schemas)
167    } else {
168        // inferred form
169        let (column_schemas, time_index) = file_column_schemas_to_table(&file_column_schemas);
170        let primary_keys = vec![];
171        (time_index, primary_keys, column_schemas)
172    };
173
174    check_file_to_table_schema_compatibility(&file_column_schemas, &table_column_schemas)
175        .context(SchemaIncompatibleSnafu)?;
176
177    let meta = FileOptions {
178        files,
179        file_column_schemas,
180    };
181    table_options.insert(
182        FILE_TABLE_META_KEY.to_string(),
183        serde_json::to_string(&meta).context(EncodeJsonSnafu)?,
184    );
185
186    let column_defs = column_schemas_to_defs(table_column_schemas, &primary_keys)?;
187    let expr = CreateTableExpr {
188        catalog_name,
189        schema_name,
190        table_name,
191        desc: String::default(),
192        column_defs,
193        time_index,
194        primary_keys,
195        create_if_not_exists: create.if_not_exists,
196        table_options,
197        table_id: None,
198        engine: create.engine.clone(),
199    };
200
201    Ok(expr)
202}
203
204/// Convert `CreateTable` statement to [`CreateTableExpr`] gRPC request.
205pub fn create_to_expr(
206    create: &CreateTable,
207    query_ctx: &QueryContextRef,
208) -> Result<CreateTableExpr> {
209    let (catalog_name, schema_name, table_name) =
210        table_idents_to_full_name(&create.name, query_ctx)
211            .map_err(BoxedError::new)
212            .context(ExternalSnafu)?;
213
214    let time_index = find_time_index(&create.constraints)?;
215    let table_options = HashMap::from(
216        &TableOptions::try_from_iter(create.options.to_str_map())
217            .context(UnrecognizedTableOptionSnafu)?,
218    );
219
220    let mut table_options = table_options;
221    if table_options.contains_key(COMPACTION_TYPE) {
222        table_options.insert(COMPACTION_OVERRIDE.to_string(), "true".to_string());
223    }
224
225    let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
226
227    let expr = CreateTableExpr {
228        catalog_name,
229        schema_name,
230        table_name,
231        desc: String::default(),
232        column_defs: columns_to_expr(
233            &create.columns,
234            &time_index,
235            &primary_keys,
236            Some(&query_ctx.timezone()),
237        )?,
238        time_index,
239        primary_keys,
240        create_if_not_exists: create.if_not_exists,
241        table_options,
242        table_id: None,
243        engine: create.engine.clone(),
244    };
245
246    validate_create_expr(&expr)?;
247    Ok(expr)
248}
249
250/// Convert gRPC's [`CreateTableExpr`] back to `CreateTable` statement.
251/// You can use `create_table_expr_by_column_schemas` to create a `CreateTableExpr` from column schemas.
252///
253/// # Parameters
254///
255/// * `expr` - The `CreateTableExpr` to convert
256/// * `quote_style` - Optional quote style for identifiers (defaults to MySQL style ` backtick)
257pub fn expr_to_create(expr: &CreateTableExpr, quote_style: Option<char>) -> Result<CreateTable> {
258    let quote_style = quote_style.unwrap_or('`');
259
260    // Convert table name
261    let table_name = ObjectName(vec![sql::ast::ObjectNamePart::Identifier(
262        sql::ast::Ident::with_quote(quote_style, &expr.table_name),
263    )]);
264
265    // Convert columns
266    let mut columns = Vec::with_capacity(expr.column_defs.len());
267    for column_def in &expr.column_defs {
268        let column_schema = try_as_column_schema(column_def).context(InvalidColumnDefSnafu {
269            column: &column_def.name,
270        })?;
271
272        let mut options = Vec::new();
273
274        // Add NULL/NOT NULL constraint
275        if column_def.is_nullable {
276            options.push(ColumnOptionDef {
277                name: None,
278                option: ColumnOption::Null,
279            });
280        } else {
281            options.push(ColumnOptionDef {
282                name: None,
283                option: ColumnOption::NotNull,
284            });
285        }
286
287        // Add DEFAULT constraint if present
288        if let Some(default_constraint) = column_schema.default_constraint() {
289            let expr = match default_constraint {
290                ColumnDefaultConstraint::Value(v) => {
291                    Expr::Value(value_to_sql_value(v).context(ParseSqlValueSnafu)?.into())
292                }
293                ColumnDefaultConstraint::Function(func_expr) => {
294                    ParserContext::parse_function(func_expr, &GreptimeDbDialect {})
295                        .context(ParseSqlSnafu)?
296                }
297            };
298            options.push(ColumnOptionDef {
299                name: None,
300                option: ColumnOption::Default(expr),
301            });
302        }
303
304        // Add COMMENT if present
305        if !column_def.comment.is_empty() {
306            options.push(ColumnOptionDef {
307                name: None,
308                option: ColumnOption::Comment(column_def.comment.clone()),
309            });
310        }
311
312        // Note: We don't add inline PRIMARY KEY options here,
313        // we'll handle all primary keys as constraints instead for consistency
314
315        // Handle column extensions (fulltext, inverted index, skipping index)
316        let mut extensions = ColumnExtensions::default();
317
318        // Add fulltext index options if present
319        if let Ok(Some(opt)) = column_schema.fulltext_options()
320            && opt.enable
321        {
322            let mut map = HashMap::from([
323                (
324                    COLUMN_FULLTEXT_OPT_KEY_ANALYZER.to_string(),
325                    opt.analyzer.to_string(),
326                ),
327                (
328                    COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE.to_string(),
329                    opt.case_sensitive.to_string(),
330                ),
331                (
332                    COLUMN_FULLTEXT_OPT_KEY_BACKEND.to_string(),
333                    opt.backend.to_string(),
334                ),
335            ]);
336            if opt.backend == FulltextBackend::Bloom {
337                map.insert(
338                    COLUMN_FULLTEXT_OPT_KEY_GRANULARITY.to_string(),
339                    opt.granularity.to_string(),
340                );
341                map.insert(
342                    COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
343                    opt.false_positive_rate().to_string(),
344                );
345            }
346            extensions.fulltext_index_options = Some(map.into());
347        }
348
349        // Add skipping index options if present
350        if let Ok(Some(opt)) = column_schema.skipping_index_options() {
351            let map = HashMap::from([
352                (
353                    COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY.to_string(),
354                    opt.granularity.to_string(),
355                ),
356                (
357                    COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
358                    opt.false_positive_rate().to_string(),
359                ),
360                (
361                    COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE.to_string(),
362                    opt.index_type.to_string(),
363                ),
364            ]);
365            extensions.skipping_index_options = Some(map.into());
366        }
367
368        // Add inverted index options if present
369        if column_schema.is_inverted_indexed() {
370            extensions.inverted_index_options = Some(HashMap::new().into());
371        }
372
373        let sql_column = SqlColumn {
374            column_def: ColumnDef {
375                name: Ident::with_quote(quote_style, &column_def.name),
376                data_type: concrete_data_type_to_sql_data_type(&column_schema.data_type)
377                    .context(ParseSqlSnafu)?,
378                options,
379            },
380            extensions,
381        };
382
383        columns.push(sql_column);
384    }
385
386    // Convert constraints
387    let mut constraints = Vec::new();
388
389    // Add TIME INDEX constraint
390    constraints.push(TableConstraint::TimeIndex {
391        column: Ident::with_quote(quote_style, &expr.time_index),
392    });
393
394    // Add PRIMARY KEY constraint (always add as constraint for consistency)
395    if !expr.primary_keys.is_empty() {
396        let primary_key_columns: Vec<Ident> = expr
397            .primary_keys
398            .iter()
399            .map(|pk| Ident::with_quote(quote_style, pk))
400            .collect();
401
402        constraints.push(TableConstraint::PrimaryKey {
403            columns: primary_key_columns,
404        });
405    }
406
407    // Convert table options
408    let mut options = OptionMap::default();
409    for (key, value) in &expr.table_options {
410        options.insert(key.clone(), value.clone());
411    }
412
413    Ok(CreateTable {
414        if_not_exists: expr.create_if_not_exists,
415        table_id: expr.table_id.as_ref().map(|tid| tid.id).unwrap_or(0),
416        name: table_name,
417        columns,
418        engine: expr.engine.clone(),
419        constraints,
420        options,
421        partitions: None,
422    })
423}
424
425/// Validate the [`CreateTableExpr`] request.
426pub fn validate_create_expr(create: &CreateTableExpr) -> Result<()> {
427    // construct column list
428    let mut column_to_indices = HashMap::with_capacity(create.column_defs.len());
429    for (idx, column) in create.column_defs.iter().enumerate() {
430        if let Some(indices) = column_to_indices.get(&column.name) {
431            return InvalidSqlSnafu {
432                err_msg: format!(
433                    "column name `{}` is duplicated at index {} and {}",
434                    column.name, indices, idx
435                ),
436            }
437            .fail();
438        }
439        column_to_indices.insert(&column.name, idx);
440    }
441
442    // verify time_index exists
443    let _ = column_to_indices
444        .get(&create.time_index)
445        .with_context(|| InvalidSqlSnafu {
446            err_msg: format!(
447                "column name `{}` is not found in column list",
448                create.time_index
449            ),
450        })?;
451
452    // verify primary_key exists
453    for pk in &create.primary_keys {
454        let _ = column_to_indices
455            .get(&pk)
456            .with_context(|| InvalidSqlSnafu {
457                err_msg: format!("column name `{}` is not found in column list", pk),
458            })?;
459    }
460
461    // construct primary_key set
462    let mut pk_set = HashSet::new();
463    for pk in &create.primary_keys {
464        if !pk_set.insert(pk) {
465            return InvalidSqlSnafu {
466                err_msg: format!("column name `{}` is duplicated in primary keys", pk),
467            }
468            .fail();
469        }
470    }
471
472    // verify time index is not primary key
473    if pk_set.contains(&create.time_index) {
474        return InvalidSqlSnafu {
475            err_msg: format!(
476                "column name `{}` is both primary key and time index",
477                create.time_index
478            ),
479        }
480        .fail();
481    }
482
483    for column in &create.column_defs {
484        // verify do not contain interval type column issue #3235
485        if is_interval_type(&column.data_type()) {
486            return InvalidSqlSnafu {
487                err_msg: format!(
488                    "column name `{}` is interval type, which is not supported",
489                    column.name
490                ),
491            }
492            .fail();
493        }
494        // verify do not contain datetime type column issue #5489
495        if is_date_time_type(&column.data_type()) {
496            return InvalidSqlSnafu {
497                err_msg: format!(
498                    "column name `{}` is datetime type, which is not supported, please use `timestamp` type instead",
499                    column.name
500                ),
501            }
502            .fail();
503        }
504    }
505    Ok(())
506}
507
508fn validate_add_columns_expr(add_columns: &AddColumns) -> Result<()> {
509    for add_column in &add_columns.add_columns {
510        let Some(column_def) = &add_column.column_def else {
511            continue;
512        };
513        if is_date_time_type(&column_def.data_type()) {
514            return InvalidSqlSnafu {
515                    err_msg: format!("column name `{}` is datetime type, which is not supported, please use `timestamp` type instead", column_def.name),
516                }
517                .fail();
518        }
519        if is_interval_type(&column_def.data_type()) {
520            return InvalidSqlSnafu {
521                err_msg: format!(
522                    "column name `{}` is interval type, which is not supported",
523                    column_def.name
524                ),
525            }
526            .fail();
527        }
528    }
529    Ok(())
530}
531
532fn is_date_time_type(data_type: &ColumnDataType) -> bool {
533    matches!(data_type, ColumnDataType::Datetime)
534}
535
536fn is_interval_type(data_type: &ColumnDataType) -> bool {
537    matches!(
538        data_type,
539        ColumnDataType::IntervalYearMonth
540            | ColumnDataType::IntervalDayTime
541            | ColumnDataType::IntervalMonthDayNano
542    )
543}
544
545fn find_primary_keys(
546    columns: &[SqlColumn],
547    constraints: &[TableConstraint],
548) -> Result<Vec<String>> {
549    let columns_pk = columns
550        .iter()
551        .filter_map(|x| {
552            if x.options().iter().any(|o| {
553                matches!(
554                    o.option,
555                    ColumnOption::Unique {
556                        is_primary: true,
557                        ..
558                    }
559                )
560            }) {
561                Some(x.name().value.clone())
562            } else {
563                None
564            }
565        })
566        .collect::<Vec<String>>();
567
568    ensure!(
569        columns_pk.len() <= 1,
570        IllegalPrimaryKeysDefSnafu {
571            msg: "not allowed to inline multiple primary keys in columns options"
572        }
573    );
574
575    let constraints_pk = constraints
576        .iter()
577        .filter_map(|constraint| match constraint {
578            TableConstraint::PrimaryKey { columns, .. } => {
579                Some(columns.iter().map(|ident| ident.value.clone()))
580            }
581            _ => None,
582        })
583        .flatten()
584        .collect::<Vec<String>>();
585
586    ensure!(
587        columns_pk.is_empty() || constraints_pk.is_empty(),
588        IllegalPrimaryKeysDefSnafu {
589            msg: "found definitions of primary keys in multiple places"
590        }
591    );
592
593    let mut primary_keys = Vec::with_capacity(columns_pk.len() + constraints_pk.len());
594    primary_keys.extend(columns_pk);
595    primary_keys.extend(constraints_pk);
596    Ok(primary_keys)
597}
598
599pub fn find_time_index(constraints: &[TableConstraint]) -> Result<String> {
600    let time_index = constraints
601        .iter()
602        .filter_map(|constraint| match constraint {
603            TableConstraint::TimeIndex { column, .. } => Some(&column.value),
604            _ => None,
605        })
606        .collect::<Vec<&String>>();
607    ensure!(
608        time_index.len() == 1,
609        InvalidSqlSnafu {
610            err_msg: "must have one and only one TimeIndex columns",
611        }
612    );
613    Ok(time_index[0].clone())
614}
615
616fn columns_to_expr(
617    column_defs: &[SqlColumn],
618    time_index: &str,
619    primary_keys: &[String],
620    timezone: Option<&Timezone>,
621) -> Result<Vec<api::v1::ColumnDef>> {
622    let column_schemas = columns_to_column_schemas(column_defs, time_index, timezone)?;
623    column_schemas_to_defs(column_schemas, primary_keys)
624}
625
626fn columns_to_column_schemas(
627    columns: &[SqlColumn],
628    time_index: &str,
629    timezone: Option<&Timezone>,
630) -> Result<Vec<ColumnSchema>> {
631    columns
632        .iter()
633        .map(|c| column_to_schema(c, time_index, timezone).context(ParseSqlSnafu))
634        .collect::<Result<Vec<ColumnSchema>>>()
635}
636
637// TODO(weny): refactor this function to use `try_as_column_def`
638pub fn column_schemas_to_defs(
639    column_schemas: Vec<ColumnSchema>,
640    primary_keys: &[String],
641) -> Result<Vec<api::v1::ColumnDef>> {
642    let column_datatypes: Vec<(ColumnDataType, Option<ColumnDataTypeExtension>)> = column_schemas
643        .iter()
644        .map(|c| {
645            ColumnDataTypeWrapper::try_from(c.data_type.clone())
646                .map(|w| w.to_parts())
647                .context(ColumnDataTypeSnafu)
648        })
649        .collect::<Result<Vec<_>>>()?;
650
651    column_schemas
652        .iter()
653        .zip(column_datatypes)
654        .map(|(schema, datatype)| {
655            let semantic_type = if schema.is_time_index() {
656                SemanticType::Timestamp
657            } else if primary_keys.contains(&schema.name) {
658                SemanticType::Tag
659            } else {
660                SemanticType::Field
661            } as i32;
662            let comment = schema
663                .metadata()
664                .get(COMMENT_KEY)
665                .cloned()
666                .unwrap_or_default();
667
668            Ok(api::v1::ColumnDef {
669                name: schema.name.clone(),
670                data_type: datatype.0 as i32,
671                is_nullable: schema.is_nullable(),
672                default_constraint: match schema.default_constraint() {
673                    None => vec![],
674                    Some(v) => {
675                        v.clone()
676                            .try_into()
677                            .context(ConvertColumnDefaultConstraintSnafu {
678                                column_name: &schema.name,
679                            })?
680                    }
681                },
682                semantic_type,
683                comment,
684                datatype_extension: datatype.1,
685                options: options_from_column_schema(schema),
686            })
687        })
688        .collect()
689}
690
691#[derive(Debug, Clone, PartialEq, Eq)]
692pub struct RepartitionRequest {
693    pub catalog_name: String,
694    pub schema_name: String,
695    pub table_name: String,
696    pub from_exprs: Vec<Expr>,
697    pub into_exprs: Vec<Expr>,
698}
699
700pub(crate) fn to_repartition_request(
701    alter_table: AlterTable,
702    query_ctx: &QueryContextRef,
703) -> Result<RepartitionRequest> {
704    let (catalog_name, schema_name, table_name) =
705        table_idents_to_full_name(alter_table.table_name(), query_ctx)
706            .map_err(BoxedError::new)
707            .context(ExternalSnafu)?;
708
709    let AlterTableOperation::Repartition { operation } = alter_table.alter_operation else {
710        return InvalidSqlSnafu {
711            err_msg: "expected REPARTITION operation",
712        }
713        .fail();
714    };
715
716    Ok(RepartitionRequest {
717        catalog_name,
718        schema_name,
719        table_name,
720        from_exprs: operation.from_exprs,
721        into_exprs: operation.into_exprs,
722    })
723}
724
725/// Converts a SQL alter table statement into a gRPC alter table expression.
726pub(crate) fn to_alter_table_expr(
727    alter_table: AlterTable,
728    query_ctx: &QueryContextRef,
729) -> Result<AlterTableExpr> {
730    let (catalog_name, schema_name, table_name) =
731        table_idents_to_full_name(alter_table.table_name(), query_ctx)
732            .map_err(BoxedError::new)
733            .context(ExternalSnafu)?;
734
735    let kind = match alter_table.alter_operation {
736        AlterTableOperation::AddConstraint(_) => {
737            return NotSupportedSnafu {
738                feat: "ADD CONSTRAINT",
739            }
740            .fail();
741        }
742        AlterTableOperation::AddColumns { add_columns } => AlterTableKind::AddColumns(AddColumns {
743            add_columns: add_columns
744                .into_iter()
745                .map(|add_column| {
746                    let column_def = sql_column_def_to_grpc_column_def(
747                        &add_column.column_def,
748                        Some(&query_ctx.timezone()),
749                    )
750                    .map_err(BoxedError::new)
751                    .context(ExternalSnafu)?;
752                    if is_interval_type(&column_def.data_type()) {
753                        return NotSupportedSnafu {
754                            feat: "Add column with interval type",
755                        }
756                        .fail();
757                    }
758                    Ok(AddColumn {
759                        column_def: Some(column_def),
760                        location: add_column.location.as_ref().map(From::from),
761                        add_if_not_exists: add_column.add_if_not_exists,
762                    })
763                })
764                .collect::<Result<Vec<AddColumn>>>()?,
765        }),
766        AlterTableOperation::ModifyColumnType {
767            column_name,
768            target_type,
769        } => {
770            let target_type =
771                sql_data_type_to_concrete_data_type(&target_type, &Default::default())
772                    .context(ParseSqlSnafu)?;
773            let (target_type, target_type_extension) = ColumnDataTypeWrapper::try_from(target_type)
774                .map(|w| w.to_parts())
775                .context(ColumnDataTypeSnafu)?;
776            if is_interval_type(&target_type) {
777                return NotSupportedSnafu {
778                    feat: "Modify column type to interval type",
779                }
780                .fail();
781            }
782            AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
783                modify_column_types: vec![ModifyColumnType {
784                    column_name: column_name.value,
785                    target_type: target_type as i32,
786                    target_type_extension,
787                }],
788            })
789        }
790        AlterTableOperation::DropColumn { name } => AlterTableKind::DropColumns(DropColumns {
791            drop_columns: vec![DropColumn {
792                name: name.value.clone(),
793            }],
794        }),
795        AlterTableOperation::RenameTable { new_table_name } => {
796            AlterTableKind::RenameTable(RenameTable {
797                new_table_name: new_table_name.clone(),
798            })
799        }
800        AlterTableOperation::SetTableOptions { options } => {
801            AlterTableKind::SetTableOptions(SetTableOptions {
802                table_options: options.into_iter().map(Into::into).collect(),
803            })
804        }
805        AlterTableOperation::UnsetTableOptions { keys } => {
806            AlterTableKind::UnsetTableOptions(UnsetTableOptions { keys })
807        }
808        AlterTableOperation::Repartition { .. } => {
809            return NotSupportedSnafu {
810                feat: "ALTER TABLE ... REPARTITION",
811            }
812            .fail();
813        }
814        AlterTableOperation::SetIndex { options } => {
815            let option = match options {
816                sql::statements::alter::SetIndexOperation::Fulltext {
817                    column_name,
818                    options,
819                } => SetIndex {
820                    options: Some(set_index::Options::Fulltext(SetFulltext {
821                        column_name: column_name.value,
822                        enable: options.enable,
823                        analyzer: match options.analyzer {
824                            FulltextAnalyzer::English => Analyzer::English.into(),
825                            FulltextAnalyzer::Chinese => Analyzer::Chinese.into(),
826                        },
827                        case_sensitive: options.case_sensitive,
828                        backend: match options.backend {
829                            FulltextBackend::Bloom => PbFulltextBackend::Bloom.into(),
830                            FulltextBackend::Tantivy => PbFulltextBackend::Tantivy.into(),
831                        },
832                        granularity: options.granularity as u64,
833                        false_positive_rate: options.false_positive_rate(),
834                    })),
835                },
836                sql::statements::alter::SetIndexOperation::Inverted { column_name } => SetIndex {
837                    options: Some(set_index::Options::Inverted(SetInverted {
838                        column_name: column_name.value,
839                    })),
840                },
841                sql::statements::alter::SetIndexOperation::Skipping {
842                    column_name,
843                    options,
844                } => SetIndex {
845                    options: Some(set_index::Options::Skipping(SetSkipping {
846                        column_name: column_name.value,
847                        enable: true,
848                        granularity: options.granularity as u64,
849                        false_positive_rate: options.false_positive_rate(),
850                        skipping_index_type: match options.index_type {
851                            SkippingIndexType::BloomFilter => {
852                                PbSkippingIndexType::BloomFilter.into()
853                            }
854                        },
855                    })),
856                },
857            };
858            AlterTableKind::SetIndexes(SetIndexes {
859                set_indexes: vec![option],
860            })
861        }
862        AlterTableOperation::UnsetIndex { options } => {
863            let option = match options {
864                sql::statements::alter::UnsetIndexOperation::Fulltext { column_name } => {
865                    UnsetIndex {
866                        options: Some(unset_index::Options::Fulltext(UnsetFulltext {
867                            column_name: column_name.value,
868                        })),
869                    }
870                }
871                sql::statements::alter::UnsetIndexOperation::Inverted { column_name } => {
872                    UnsetIndex {
873                        options: Some(unset_index::Options::Inverted(UnsetInverted {
874                            column_name: column_name.value,
875                        })),
876                    }
877                }
878                sql::statements::alter::UnsetIndexOperation::Skipping { column_name } => {
879                    UnsetIndex {
880                        options: Some(unset_index::Options::Skipping(UnsetSkipping {
881                            column_name: column_name.value,
882                        })),
883                    }
884                }
885            };
886
887            AlterTableKind::UnsetIndexes(UnsetIndexes {
888                unset_indexes: vec![option],
889            })
890        }
891        AlterTableOperation::DropDefaults { columns } => {
892            AlterTableKind::DropDefaults(DropDefaults {
893                drop_defaults: columns
894                    .into_iter()
895                    .map(|col| {
896                        let column_name = col.0.to_string();
897                        Ok(api::v1::DropDefault { column_name })
898                    })
899                    .collect::<Result<Vec<_>>>()?,
900            })
901        }
902        AlterTableOperation::SetDefaults { defaults } => AlterTableKind::SetDefaults(SetDefaults {
903            set_defaults: defaults
904                .into_iter()
905                .map(|col| {
906                    let column_name = col.column_name.to_string();
907                    let default_constraint = serde_json::to_string(&col.default_constraint)
908                        .context(EncodeJsonSnafu)?
909                        .into_bytes();
910                    Ok(api::v1::SetDefault {
911                        column_name,
912                        default_constraint,
913                    })
914                })
915                .collect::<Result<Vec<_>>>()?,
916        }),
917    };
918
919    Ok(AlterTableExpr {
920        catalog_name,
921        schema_name,
922        table_name,
923        kind: Some(kind),
924    })
925}
926
927/// Try to cast the `[AlterDatabase]` statement into gRPC `[AlterDatabaseExpr]`.
928pub fn to_alter_database_expr(
929    alter_database: AlterDatabase,
930    query_ctx: &QueryContextRef,
931) -> Result<AlterDatabaseExpr> {
932    let catalog = query_ctx.current_catalog();
933    let schema = alter_database.database_name;
934
935    let kind = match alter_database.alter_operation {
936        AlterDatabaseOperation::SetDatabaseOption { options } => {
937            let options = options.into_iter().map(Into::into).collect();
938            AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
939                set_database_options: options,
940            })
941        }
942        AlterDatabaseOperation::UnsetDatabaseOption { keys } => {
943            AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys })
944        }
945    };
946
947    Ok(AlterDatabaseExpr {
948        catalog_name: catalog.to_string(),
949        schema_name: schema.to_string(),
950        kind: Some(kind),
951    })
952}
953
954/// Try to cast the `[CreateViewExpr]` statement into gRPC `[CreateViewExpr]`.
955pub fn to_create_view_expr(
956    stmt: CreateView,
957    logical_plan: Vec<u8>,
958    table_names: Vec<TableName>,
959    columns: Vec<String>,
960    plan_columns: Vec<String>,
961    definition: String,
962    query_ctx: QueryContextRef,
963) -> Result<CreateViewExpr> {
964    let (catalog_name, schema_name, view_name) = table_idents_to_full_name(&stmt.name, &query_ctx)
965        .map_err(BoxedError::new)
966        .context(ExternalSnafu)?;
967
968    let expr = CreateViewExpr {
969        catalog_name,
970        schema_name,
971        view_name,
972        logical_plan,
973        create_if_not_exists: stmt.if_not_exists,
974        or_replace: stmt.or_replace,
975        table_names,
976        columns,
977        plan_columns,
978        definition,
979    };
980
981    Ok(expr)
982}
983
984pub fn to_create_flow_task_expr(
985    create_flow: CreateFlow,
986    query_ctx: &QueryContextRef,
987) -> Result<CreateFlowExpr> {
988    // retrieve sink table name
989    let sink_table_ref = object_name_to_table_reference(create_flow.sink_table_name.clone(), true)
990        .with_context(|_| ConvertIdentifierSnafu {
991            ident: create_flow.sink_table_name.to_string(),
992        })?;
993    let catalog = sink_table_ref
994        .catalog()
995        .unwrap_or(query_ctx.current_catalog())
996        .to_string();
997    let schema = sink_table_ref
998        .schema()
999        .map(|s| s.to_owned())
1000        .unwrap_or(query_ctx.current_schema());
1001
1002    let sink_table_name = TableName {
1003        catalog_name: catalog,
1004        schema_name: schema,
1005        table_name: sink_table_ref.table().to_string(),
1006    };
1007
1008    let source_table_names = extract_tables_from_query(&create_flow.query)
1009        .map(|name| {
1010            let reference =
1011                object_name_to_table_reference(name.clone(), true).with_context(|_| {
1012                    ConvertIdentifierSnafu {
1013                        ident: name.to_string(),
1014                    }
1015                })?;
1016            let catalog = reference
1017                .catalog()
1018                .unwrap_or(query_ctx.current_catalog())
1019                .to_string();
1020            let schema = reference
1021                .schema()
1022                .map(|s| s.to_string())
1023                .unwrap_or(query_ctx.current_schema());
1024
1025            let table_name = TableName {
1026                catalog_name: catalog,
1027                schema_name: schema,
1028                table_name: reference.table().to_string(),
1029            };
1030            Ok(table_name)
1031        })
1032        .collect::<Result<Vec<_>>>()?;
1033
1034    let eval_interval = create_flow.eval_interval;
1035
1036    Ok(CreateFlowExpr {
1037        catalog_name: query_ctx.current_catalog().to_string(),
1038        flow_name: sanitize_flow_name(create_flow.flow_name)?,
1039        source_table_names,
1040        sink_table_name: Some(sink_table_name),
1041        or_replace: create_flow.or_replace,
1042        create_if_not_exists: create_flow.if_not_exists,
1043        expire_after: create_flow.expire_after.map(|value| ExpireAfter { value }),
1044        eval_interval: eval_interval.map(|seconds| api::v1::EvalInterval { seconds }),
1045        comment: create_flow.comment.unwrap_or_default(),
1046        sql: create_flow.query.to_string(),
1047        flow_options: Default::default(),
1048    })
1049}
1050
1051/// sanitize the flow name, remove possible quotes
1052fn sanitize_flow_name(mut flow_name: ObjectName) -> Result<String> {
1053    ensure!(
1054        flow_name.0.len() == 1,
1055        InvalidFlowNameSnafu {
1056            name: flow_name.to_string(),
1057        }
1058    );
1059    // safety: we've checked flow_name.0 has exactly one element.
1060    Ok(flow_name.0.swap_remove(0).to_string_unquoted())
1061}
1062
1063#[cfg(test)]
1064mod tests {
1065    use api::v1::{SetDatabaseOptions, UnsetDatabaseOptions};
1066    use datatypes::value::Value;
1067    use session::context::{QueryContext, QueryContextBuilder};
1068    use sql::dialect::GreptimeDbDialect;
1069    use sql::parser::{ParseOptions, ParserContext};
1070    use sql::statements::statement::Statement;
1071    use store_api::storage::ColumnDefaultConstraint;
1072
1073    use super::*;
1074
1075    #[test]
1076    fn test_create_flow_tql_expr() {
1077        let sql = r#"
1078CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1079TQL EVAL (0, 15, '5s') count_values("status_code", http_requests);"#;
1080        let stmt =
1081            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1082
1083        assert!(
1084            stmt.is_err(),
1085            "Expected error for invalid TQL EVAL parameters: {:#?}",
1086            stmt
1087        );
1088
1089        let sql = r#"
1090CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1091TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests);"#;
1092        let stmt =
1093            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1094                .unwrap()
1095                .pop()
1096                .unwrap();
1097
1098        let Statement::CreateFlow(create_flow) = stmt else {
1099            unreachable!()
1100        };
1101        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1102
1103        let to_dot_sep =
1104            |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1105        assert_eq!("calc_reqs", expr.flow_name);
1106        assert_eq!("greptime", expr.catalog_name);
1107        assert_eq!(
1108            "greptime.public.cnt_reqs",
1109            expr.sink_table_name.map(to_dot_sep).unwrap()
1110        );
1111        assert!(expr.source_table_names.is_empty());
1112        assert_eq!(
1113            r#"TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests)"#,
1114            expr.sql
1115        );
1116    }
1117
1118    #[test]
1119    fn test_create_flow_expr() {
1120        let sql = r"
1121CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
1122SELECT
1123    DISTINCT number as dis
1124FROM
1125    distinct_basic;";
1126        let stmt =
1127            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1128                .unwrap()
1129                .pop()
1130                .unwrap();
1131
1132        let Statement::CreateFlow(create_flow) = stmt else {
1133            unreachable!()
1134        };
1135        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1136
1137        let to_dot_sep =
1138            |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1139        assert_eq!("test_distinct_basic", expr.flow_name);
1140        assert_eq!("greptime", expr.catalog_name);
1141        assert_eq!(
1142            "greptime.public.out_distinct_basic",
1143            expr.sink_table_name.map(to_dot_sep).unwrap()
1144        );
1145        assert_eq!(1, expr.source_table_names.len());
1146        assert_eq!(
1147            "greptime.public.distinct_basic",
1148            to_dot_sep(expr.source_table_names[0].clone())
1149        );
1150        assert_eq!(
1151            r"SELECT
1152    DISTINCT number as dis
1153FROM
1154    distinct_basic",
1155            expr.sql
1156        );
1157
1158        let sql = r"
1159CREATE FLOW `task_2`
1160SINK TO schema_1.table_1
1161AS
1162SELECT max(c1), min(c2) FROM schema_2.table_2;";
1163        let stmt =
1164            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1165                .unwrap()
1166                .pop()
1167                .unwrap();
1168
1169        let Statement::CreateFlow(create_flow) = stmt else {
1170            unreachable!()
1171        };
1172        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1173
1174        let to_dot_sep =
1175            |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1176        assert_eq!("task_2", expr.flow_name);
1177        assert_eq!("greptime", expr.catalog_name);
1178        assert_eq!(
1179            "greptime.schema_1.table_1",
1180            expr.sink_table_name.map(to_dot_sep).unwrap()
1181        );
1182        assert_eq!(1, expr.source_table_names.len());
1183        assert_eq!(
1184            "greptime.schema_2.table_2",
1185            to_dot_sep(expr.source_table_names[0].clone())
1186        );
1187        assert_eq!("SELECT max(c1), min(c2) FROM schema_2.table_2", expr.sql);
1188
1189        let sql = r"
1190CREATE FLOW abc.`task_2`
1191SINK TO schema_1.table_1
1192AS
1193SELECT max(c1), min(c2) FROM schema_2.table_2;";
1194        let stmt =
1195            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1196                .unwrap()
1197                .pop()
1198                .unwrap();
1199
1200        let Statement::CreateFlow(create_flow) = stmt else {
1201            unreachable!()
1202        };
1203        let res = to_create_flow_task_expr(create_flow, &QueryContext::arc());
1204
1205        assert!(res.is_err());
1206        assert!(
1207            res.unwrap_err()
1208                .to_string()
1209                .contains("Invalid flow name: abc.`task_2`")
1210        );
1211    }
1212
1213    #[test]
1214    fn test_create_to_expr() {
1215        let sql = "CREATE TABLE monitor (host STRING,ts TIMESTAMP,TIME INDEX (ts),PRIMARY KEY(host)) ENGINE=mito WITH(ttl='3days', write_buffer_size='1024KB');";
1216        let stmt =
1217            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1218                .unwrap()
1219                .pop()
1220                .unwrap();
1221
1222        let Statement::CreateTable(create_table) = stmt else {
1223            unreachable!()
1224        };
1225        let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap();
1226        assert_eq!("3days", expr.table_options.get("ttl").unwrap());
1227        assert_eq!(
1228            "1.0MiB",
1229            expr.table_options.get("write_buffer_size").unwrap()
1230        );
1231    }
1232
1233    #[test]
1234    fn test_invalid_create_to_expr() {
1235        let cases = [
1236            // duplicate column declaration
1237            "CREATE TABLE monitor (host STRING primary key, ts TIMESTAMP TIME INDEX, some_column text, some_column string);",
1238            // duplicate primary key
1239            "CREATE TABLE monitor (host STRING, ts TIMESTAMP TIME INDEX, some_column STRING, PRIMARY KEY (some_column, host, some_column));",
1240            // time index is primary key
1241            "CREATE TABLE monitor (host STRING, ts TIMESTAMP TIME INDEX, PRIMARY KEY (host, ts));",
1242        ];
1243
1244        for sql in cases {
1245            let stmt = ParserContext::create_with_dialect(
1246                sql,
1247                &GreptimeDbDialect {},
1248                ParseOptions::default(),
1249            )
1250            .unwrap()
1251            .pop()
1252            .unwrap();
1253            let Statement::CreateTable(create_table) = stmt else {
1254                unreachable!()
1255            };
1256            create_to_expr(&create_table, &QueryContext::arc()).unwrap_err();
1257        }
1258    }
1259
1260    #[test]
1261    fn test_create_to_expr_with_default_timestamp_value() {
1262        let sql = "CREATE TABLE monitor (v double,ts TIMESTAMP default '2024-01-30T00:01:01',TIME INDEX (ts)) engine=mito;";
1263        let stmt =
1264            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1265                .unwrap()
1266                .pop()
1267                .unwrap();
1268
1269        let Statement::CreateTable(create_table) = stmt else {
1270            unreachable!()
1271        };
1272
1273        // query context with system timezone UTC.
1274        let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap();
1275        let ts_column = &expr.column_defs[1];
1276        let constraint = assert_ts_column(ts_column);
1277        assert!(
1278            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1279                         if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
1280        );
1281
1282        // query context with timezone `+08:00`
1283        let ctx = QueryContextBuilder::default()
1284            .timezone(Timezone::from_tz_string("+08:00").unwrap())
1285            .build()
1286            .into();
1287        let expr = create_to_expr(&create_table, &ctx).unwrap();
1288        let ts_column = &expr.column_defs[1];
1289        let constraint = assert_ts_column(ts_column);
1290        assert!(
1291            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1292                         if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
1293        );
1294    }
1295
1296    fn assert_ts_column(ts_column: &api::v1::ColumnDef) -> ColumnDefaultConstraint {
1297        assert_eq!("ts", ts_column.name);
1298        assert_eq!(
1299            ColumnDataType::TimestampMillisecond as i32,
1300            ts_column.data_type
1301        );
1302        assert!(!ts_column.default_constraint.is_empty());
1303
1304        ColumnDefaultConstraint::try_from(&ts_column.default_constraint[..]).unwrap()
1305    }
1306
1307    #[test]
1308    fn test_to_alter_expr() {
1309        let sql = "ALTER DATABASE greptime SET key1='value1', key2='value2';";
1310        let stmt =
1311            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1312                .unwrap()
1313                .pop()
1314                .unwrap();
1315
1316        let Statement::AlterDatabase(alter_database) = stmt else {
1317            unreachable!()
1318        };
1319
1320        let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
1321        let kind = expr.kind.unwrap();
1322
1323        let AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
1324            set_database_options,
1325        }) = kind
1326        else {
1327            unreachable!()
1328        };
1329
1330        assert_eq!(2, set_database_options.len());
1331        assert_eq!("key1", set_database_options[0].key);
1332        assert_eq!("value1", set_database_options[0].value);
1333        assert_eq!("key2", set_database_options[1].key);
1334        assert_eq!("value2", set_database_options[1].value);
1335
1336        let sql = "ALTER DATABASE greptime UNSET key1, key2;";
1337        let stmt =
1338            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1339                .unwrap()
1340                .pop()
1341                .unwrap();
1342
1343        let Statement::AlterDatabase(alter_database) = stmt else {
1344            unreachable!()
1345        };
1346
1347        let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
1348        let kind = expr.kind.unwrap();
1349
1350        let AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys }) = kind else {
1351            unreachable!()
1352        };
1353
1354        assert_eq!(2, keys.len());
1355        assert!(keys.contains(&"key1".to_string()));
1356        assert!(keys.contains(&"key2".to_string()));
1357
1358        let sql = "ALTER TABLE monitor add column ts TIMESTAMP default '2024-01-30T00:01:01';";
1359        let stmt =
1360            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1361                .unwrap()
1362                .pop()
1363                .unwrap();
1364
1365        let Statement::AlterTable(alter_table) = stmt else {
1366            unreachable!()
1367        };
1368
1369        // query context with system timezone UTC.
1370        let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
1371        let kind = expr.kind.unwrap();
1372
1373        let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
1374            unreachable!()
1375        };
1376
1377        assert_eq!(1, add_columns.len());
1378        let ts_column = add_columns[0].column_def.clone().unwrap();
1379        let constraint = assert_ts_column(&ts_column);
1380        assert!(
1381            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1382                         if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
1383        );
1384
1385        //
1386        // query context with timezone `+08:00`
1387        let ctx = QueryContextBuilder::default()
1388            .timezone(Timezone::from_tz_string("+08:00").unwrap())
1389            .build()
1390            .into();
1391        let expr = to_alter_table_expr(alter_table, &ctx).unwrap();
1392        let kind = expr.kind.unwrap();
1393
1394        let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
1395            unreachable!()
1396        };
1397
1398        assert_eq!(1, add_columns.len());
1399        let ts_column = add_columns[0].column_def.clone().unwrap();
1400        let constraint = assert_ts_column(&ts_column);
1401        assert!(
1402            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1403                         if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
1404        );
1405    }
1406
1407    #[test]
1408    fn test_to_alter_modify_column_type_expr() {
1409        let sql = "ALTER TABLE monitor MODIFY COLUMN mem_usage STRING;";
1410        let stmt =
1411            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1412                .unwrap()
1413                .pop()
1414                .unwrap();
1415
1416        let Statement::AlterTable(alter_table) = stmt else {
1417            unreachable!()
1418        };
1419
1420        // query context with system timezone UTC.
1421        let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
1422        let kind = expr.kind.unwrap();
1423
1424        let AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
1425            modify_column_types,
1426        }) = kind
1427        else {
1428            unreachable!()
1429        };
1430
1431        assert_eq!(1, modify_column_types.len());
1432        let modify_column_type = &modify_column_types[0];
1433
1434        assert_eq!("mem_usage", modify_column_type.column_name);
1435        assert_eq!(
1436            ColumnDataType::String as i32,
1437            modify_column_type.target_type
1438        );
1439        assert!(modify_column_type.target_type_extension.is_none());
1440    }
1441
1442    #[test]
1443    fn test_to_repartition_request() {
1444        let sql = r#"
1445ALTER TABLE metrics REPARTITION (
1446  device_id < 100
1447) INTO (
1448  device_id < 100 AND area < 'South',
1449  device_id < 100 AND area >= 'South'
1450);"#;
1451        let stmt =
1452            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1453                .unwrap()
1454                .pop()
1455                .unwrap();
1456
1457        let Statement::AlterTable(alter_table) = stmt else {
1458            unreachable!()
1459        };
1460
1461        let request = to_repartition_request(alter_table, &QueryContext::arc()).unwrap();
1462        assert_eq!("greptime", request.catalog_name);
1463        assert_eq!("public", request.schema_name);
1464        assert_eq!("metrics", request.table_name);
1465        assert_eq!(
1466            request
1467                .from_exprs
1468                .into_iter()
1469                .map(|x| x.to_string())
1470                .collect::<Vec<_>>(),
1471            vec!["device_id < 100".to_string()]
1472        );
1473        assert_eq!(
1474            request
1475                .into_exprs
1476                .into_iter()
1477                .map(|x| x.to_string())
1478                .collect::<Vec<_>>(),
1479            vec![
1480                "device_id < 100 AND area < 'South'".to_string(),
1481                "device_id < 100 AND area >= 'South'".to_string()
1482            ]
1483        );
1484    }
1485
1486    fn new_test_table_names() -> Vec<TableName> {
1487        vec![
1488            TableName {
1489                catalog_name: "greptime".to_string(),
1490                schema_name: "public".to_string(),
1491                table_name: "a_table".to_string(),
1492            },
1493            TableName {
1494                catalog_name: "greptime".to_string(),
1495                schema_name: "public".to_string(),
1496                table_name: "b_table".to_string(),
1497            },
1498        ]
1499    }
1500
1501    #[test]
1502    fn test_to_create_view_expr() {
1503        let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
1504        let stmt =
1505            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1506                .unwrap()
1507                .pop()
1508                .unwrap();
1509
1510        let Statement::CreateView(stmt) = stmt else {
1511            unreachable!()
1512        };
1513
1514        let logical_plan = vec![1, 2, 3];
1515        let table_names = new_test_table_names();
1516        let columns = vec!["a".to_string()];
1517        let plan_columns = vec!["number".to_string()];
1518
1519        let expr = to_create_view_expr(
1520            stmt,
1521            logical_plan.clone(),
1522            table_names.clone(),
1523            columns.clone(),
1524            plan_columns.clone(),
1525            sql.to_string(),
1526            QueryContext::arc(),
1527        )
1528        .unwrap();
1529
1530        assert_eq!("greptime", expr.catalog_name);
1531        assert_eq!("public", expr.schema_name);
1532        assert_eq!("test", expr.view_name);
1533        assert!(!expr.create_if_not_exists);
1534        assert!(!expr.or_replace);
1535        assert_eq!(logical_plan, expr.logical_plan);
1536        assert_eq!(table_names, expr.table_names);
1537        assert_eq!(sql, expr.definition);
1538        assert_eq!(columns, expr.columns);
1539        assert_eq!(plan_columns, expr.plan_columns);
1540    }
1541
1542    #[test]
1543    fn test_to_create_view_expr_complex() {
1544        let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test.test_view AS SELECT * FROM NUMBERS";
1545        let stmt =
1546            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1547                .unwrap()
1548                .pop()
1549                .unwrap();
1550
1551        let Statement::CreateView(stmt) = stmt else {
1552            unreachable!()
1553        };
1554
1555        let logical_plan = vec![1, 2, 3];
1556        let table_names = new_test_table_names();
1557        let columns = vec!["a".to_string()];
1558        let plan_columns = vec!["number".to_string()];
1559
1560        let expr = to_create_view_expr(
1561            stmt,
1562            logical_plan.clone(),
1563            table_names.clone(),
1564            columns.clone(),
1565            plan_columns.clone(),
1566            sql.to_string(),
1567            QueryContext::arc(),
1568        )
1569        .unwrap();
1570
1571        assert_eq!("greptime", expr.catalog_name);
1572        assert_eq!("test", expr.schema_name);
1573        assert_eq!("test_view", expr.view_name);
1574        assert!(expr.create_if_not_exists);
1575        assert!(expr.or_replace);
1576        assert_eq!(logical_plan, expr.logical_plan);
1577        assert_eq!(table_names, expr.table_names);
1578        assert_eq!(sql, expr.definition);
1579        assert_eq!(columns, expr.columns);
1580        assert_eq!(plan_columns, expr.plan_columns);
1581    }
1582
1583    #[test]
1584    fn test_expr_to_create() {
1585        let sql = r#"CREATE TABLE IF NOT EXISTS `tt` (
1586  `timestamp` TIMESTAMP(9) NOT NULL,
1587  `ip_address` STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),
1588  `username` STRING NULL,
1589  `http_method` STRING NULL INVERTED INDEX,
1590  `request_line` STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', backend = 'bloom', case_sensitive = 'false', false_positive_rate = '0.01', granularity = '10240'),
1591  `protocol` STRING NULL,
1592  `status_code` INT NULL INVERTED INDEX,
1593  `response_size` BIGINT NULL,
1594  `message` STRING NULL,
1595  TIME INDEX (`timestamp`),
1596  PRIMARY KEY (`username`, `status_code`)
1597)
1598ENGINE=mito
1599WITH(
1600  append_mode = 'true'
1601)"#;
1602        let stmt =
1603            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1604                .unwrap()
1605                .pop()
1606                .unwrap();
1607
1608        let Statement::CreateTable(original_create) = stmt else {
1609            unreachable!()
1610        };
1611
1612        // Convert CreateTable -> CreateTableExpr -> CreateTable
1613        let expr = create_to_expr(&original_create, &QueryContext::arc()).unwrap();
1614
1615        let create_table = expr_to_create(&expr, Some('`')).unwrap();
1616        let new_sql = format!("{:#}", create_table);
1617        assert_eq!(sql, new_sql);
1618    }
1619}