sql/statements/
create.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::fmt::{Display, Formatter};
17
18use common_catalog::consts::FILE_ENGINE;
19use datatypes::json::JsonStructureSettings;
20use datatypes::schema::{
21    FulltextOptions, SkippingIndexOptions, VectorDistanceMetric, VectorIndexEngineType,
22    VectorIndexOptions,
23};
24use itertools::Itertools;
25use serde::Serialize;
26use snafu::ResultExt;
27use sqlparser::ast::{ColumnOptionDef, DataType, Expr, Query};
28use sqlparser_derive::{Visit, VisitMut};
29
30use crate::ast::{ColumnDef, Ident, ObjectName, Value as SqlValue};
31use crate::error::{
32    InvalidFlowQuerySnafu, InvalidSqlSnafu, Result, SetFulltextOptionSnafu,
33    SetSkippingIndexOptionSnafu,
34};
35use crate::statements::OptionMap;
36use crate::statements::statement::Statement;
37use crate::statements::tql::Tql;
38use crate::util::OptionValue;
39
40const LINE_SEP: &str = ",\n";
41const COMMA_SEP: &str = ", ";
42const INDENT: usize = 2;
43pub const VECTOR_OPT_DIM: &str = "dim";
44
45pub const JSON_OPT_UNSTRUCTURED_KEYS: &str = "unstructured_keys";
46pub const JSON_OPT_FORMAT: &str = "format";
47pub const JSON_FORMAT_FULL_STRUCTURED: &str = "structured";
48pub const JSON_FORMAT_RAW: &str = "raw";
49pub const JSON_FORMAT_PARTIAL: &str = "partial";
50
51macro_rules! format_indent {
52    ($fmt: expr, $arg: expr) => {
53        format!($fmt, format_args!("{: >1$}", "", INDENT), $arg)
54    };
55    ($arg: expr) => {
56        format_indent!("{}{}", $arg)
57    };
58}
59
60macro_rules! format_list_indent {
61    ($list: expr) => {
62        $list.iter().map(|e| format_indent!(e)).join(LINE_SEP)
63    };
64}
65
66macro_rules! format_list_comma {
67    ($list: expr) => {
68        $list.iter().map(|e| format!("{}", e)).join(COMMA_SEP)
69    };
70}
71
72#[cfg(feature = "enterprise")]
73pub mod trigger;
74
75fn format_table_constraint(constraints: &[TableConstraint]) -> String {
76    constraints.iter().map(|c| format_indent!(c)).join(LINE_SEP)
77}
78
79/// Table constraint for create table statement.
80#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
81pub enum TableConstraint {
82    /// Primary key constraint.
83    PrimaryKey { columns: Vec<Ident> },
84    /// Time index constraint.
85    TimeIndex { column: Ident },
86}
87
88impl Display for TableConstraint {
89    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
90        match self {
91            TableConstraint::PrimaryKey { columns } => {
92                write!(f, "PRIMARY KEY ({})", format_list_comma!(columns))
93            }
94            TableConstraint::TimeIndex { column } => {
95                write!(f, "TIME INDEX ({})", column)
96            }
97        }
98    }
99}
100
101#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
102pub struct CreateTable {
103    /// Create if not exists
104    pub if_not_exists: bool,
105    pub table_id: u32,
106    /// Table name
107    pub name: ObjectName,
108    pub columns: Vec<Column>,
109    pub engine: String,
110    pub constraints: Vec<TableConstraint>,
111    /// Table options in `WITH`. All keys are lowercase.
112    pub options: OptionMap,
113    pub partitions: Option<Partitions>,
114}
115
116/// Column definition in `CREATE TABLE` statement.
117#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
118pub struct Column {
119    /// `ColumnDef` from `sqlparser::ast`
120    pub column_def: ColumnDef,
121    /// Column extensions for greptimedb dialect.
122    pub extensions: ColumnExtensions,
123}
124
125/// Column extensions for greptimedb dialect.
126#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Default, Serialize)]
127pub struct ColumnExtensions {
128    /// Vector type options.
129    pub vector_options: Option<OptionMap>,
130
131    /// Fulltext index options.
132    pub fulltext_index_options: Option<OptionMap>,
133    /// Skipping index options.
134    pub skipping_index_options: Option<OptionMap>,
135    /// Inverted index options.
136    ///
137    /// Inverted index doesn't have options at present. There won't be any options in that map.
138    pub inverted_index_options: Option<OptionMap>,
139    /// Vector index options for HNSW-based vector similarity search.
140    pub vector_index_options: Option<OptionMap>,
141    pub json_datatype_options: Option<OptionMap>,
142}
143
144impl Column {
145    pub fn name(&self) -> &Ident {
146        &self.column_def.name
147    }
148
149    pub fn data_type(&self) -> &DataType {
150        &self.column_def.data_type
151    }
152
153    pub fn mut_data_type(&mut self) -> &mut DataType {
154        &mut self.column_def.data_type
155    }
156
157    pub fn options(&self) -> &[ColumnOptionDef] {
158        &self.column_def.options
159    }
160
161    pub fn mut_options(&mut self) -> &mut Vec<ColumnOptionDef> {
162        &mut self.column_def.options
163    }
164}
165
166impl Display for Column {
167    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
168        if let Some(vector_options) = &self.extensions.vector_options
169            && let Some(dim) = vector_options.get(VECTOR_OPT_DIM)
170        {
171            write!(f, "{} VECTOR({})", self.column_def.name, dim)?;
172            return Ok(());
173        }
174
175        write!(f, "{} {}", self.column_def.name, self.column_def.data_type)?;
176        if let Some(options) = &self.extensions.json_datatype_options {
177            write!(
178                f,
179                "({})",
180                options
181                    .entries()
182                    .map(|(k, v)| format!("{k} = {v}"))
183                    .join(COMMA_SEP)
184            )?;
185        }
186        for option in &self.column_def.options {
187            write!(f, " {option}")?;
188        }
189
190        if let Some(fulltext_options) = &self.extensions.fulltext_index_options {
191            if !fulltext_options.is_empty() {
192                let options = fulltext_options.kv_pairs();
193                write!(f, " FULLTEXT INDEX WITH({})", format_list_comma!(options))?;
194            } else {
195                write!(f, " FULLTEXT INDEX")?;
196            }
197        }
198
199        if let Some(skipping_index_options) = &self.extensions.skipping_index_options {
200            if !skipping_index_options.is_empty() {
201                let options = skipping_index_options.kv_pairs();
202                write!(f, " SKIPPING INDEX WITH({})", format_list_comma!(options))?;
203            } else {
204                write!(f, " SKIPPING INDEX")?;
205            }
206        }
207
208        if let Some(inverted_index_options) = &self.extensions.inverted_index_options {
209            if !inverted_index_options.is_empty() {
210                let options = inverted_index_options.kv_pairs();
211                write!(f, " INVERTED INDEX WITH({})", format_list_comma!(options))?;
212            } else {
213                write!(f, " INVERTED INDEX")?;
214            }
215        }
216
217        if let Some(vector_index_options) = &self.extensions.vector_index_options {
218            if !vector_index_options.is_empty() {
219                let options = vector_index_options.kv_pairs();
220                write!(f, " VECTOR INDEX WITH({})", format_list_comma!(options))?;
221            } else {
222                write!(f, " VECTOR INDEX")?;
223            }
224        }
225        Ok(())
226    }
227}
228
229impl ColumnExtensions {
230    pub fn build_fulltext_options(&self) -> Result<Option<FulltextOptions>> {
231        let Some(options) = self.fulltext_index_options.as_ref() else {
232            return Ok(None);
233        };
234
235        let options: HashMap<String, String> = options.clone().into_map();
236        Ok(Some(options.try_into().context(SetFulltextOptionSnafu)?))
237    }
238
239    pub fn build_skipping_index_options(&self) -> Result<Option<SkippingIndexOptions>> {
240        let Some(options) = self.skipping_index_options.as_ref() else {
241            return Ok(None);
242        };
243
244        let options: HashMap<String, String> = options.clone().into_map();
245        Ok(Some(
246            options.try_into().context(SetSkippingIndexOptionSnafu)?,
247        ))
248    }
249
250    pub fn build_vector_index_options(&self) -> Result<Option<VectorIndexOptions>> {
251        let Some(options) = self.vector_index_options.as_ref() else {
252            return Ok(None);
253        };
254
255        let options_map: HashMap<String, String> = options.clone().into_map();
256        let mut result = VectorIndexOptions::default();
257
258        if let Some(s) = options_map.get("engine") {
259            result.engine = s.parse::<VectorIndexEngineType>().map_err(|e| {
260                InvalidSqlSnafu {
261                    msg: format!("invalid VECTOR INDEX engine: {e}"),
262                }
263                .build()
264            })?;
265        }
266
267        if let Some(s) = options_map.get("metric") {
268            result.metric = s.parse::<VectorDistanceMetric>().map_err(|e| {
269                InvalidSqlSnafu {
270                    msg: format!("invalid VECTOR INDEX metric: {e}"),
271                }
272                .build()
273            })?;
274        }
275
276        if let Some(s) = options_map.get("connectivity") {
277            let value = s.parse::<u32>().map_err(|_| {
278                InvalidSqlSnafu {
279                    msg: format!(
280                        "invalid VECTOR INDEX connectivity: {s}, expected positive integer"
281                    ),
282                }
283                .build()
284            })?;
285            if !(2..=2048).contains(&value) {
286                return InvalidSqlSnafu {
287                    msg: "VECTOR INDEX connectivity must be in the range [2, 2048].".to_string(),
288                }
289                .fail();
290            }
291            result.connectivity = value;
292        }
293
294        if let Some(s) = options_map.get("expansion_add") {
295            let value = s.parse::<u32>().map_err(|_| {
296                InvalidSqlSnafu {
297                    msg: format!(
298                        "invalid VECTOR INDEX expansion_add: {s}, expected positive integer"
299                    ),
300                }
301                .build()
302            })?;
303            if value == 0 {
304                return InvalidSqlSnafu {
305                    msg: "VECTOR INDEX expansion_add must be greater than 0".to_string(),
306                }
307                .fail();
308            }
309            result.expansion_add = value;
310        }
311
312        if let Some(s) = options_map.get("expansion_search") {
313            let value = s.parse::<u32>().map_err(|_| {
314                InvalidSqlSnafu {
315                    msg: format!(
316                        "invalid VECTOR INDEX expansion_search: {s}, expected positive integer"
317                    ),
318                }
319                .build()
320            })?;
321            if value == 0 {
322                return InvalidSqlSnafu {
323                    msg: "VECTOR INDEX expansion_search must be greater than 0".to_string(),
324                }
325                .fail();
326            }
327            result.expansion_search = value;
328        }
329
330        Ok(Some(result))
331    }
332
333    pub fn build_json_structure_settings(&self) -> Result<Option<JsonStructureSettings>> {
334        let Some(options) = self.json_datatype_options.as_ref() else {
335            return Ok(None);
336        };
337
338        let unstructured_keys = options
339            .value(JSON_OPT_UNSTRUCTURED_KEYS)
340            .and_then(|v| {
341                v.as_list().map(|x| {
342                    x.into_iter()
343                        .map(|x| x.to_string())
344                        .collect::<HashSet<String>>()
345                })
346            })
347            .unwrap_or_default();
348
349        options
350            .get(JSON_OPT_FORMAT)
351            .map(|format| match format {
352                JSON_FORMAT_FULL_STRUCTURED => Ok(JsonStructureSettings::Structured(None)),
353                JSON_FORMAT_PARTIAL => Ok(JsonStructureSettings::PartialUnstructuredByKey {
354                    fields: None,
355                    unstructured_keys,
356                }),
357                JSON_FORMAT_RAW => Ok(JsonStructureSettings::UnstructuredRaw),
358                _ => InvalidSqlSnafu {
359                    msg: format!("unknown JSON datatype 'format': {format}"),
360                }
361                .fail(),
362            })
363            .transpose()
364    }
365
366    pub fn set_json_structure_settings(&mut self, settings: JsonStructureSettings) {
367        let mut map = OptionMap::default();
368
369        let format = match settings {
370            JsonStructureSettings::Structured(_) => JSON_FORMAT_FULL_STRUCTURED,
371            JsonStructureSettings::PartialUnstructuredByKey { .. } => JSON_FORMAT_PARTIAL,
372            JsonStructureSettings::UnstructuredRaw => JSON_FORMAT_RAW,
373        };
374        map.insert(JSON_OPT_FORMAT.to_string(), format.to_string());
375
376        if let JsonStructureSettings::PartialUnstructuredByKey {
377            fields: _,
378            unstructured_keys,
379        } = settings
380        {
381            let value = OptionValue::from(
382                unstructured_keys
383                    .iter()
384                    .map(|x| x.as_str())
385                    .sorted()
386                    .collect::<Vec<_>>(),
387            );
388            map.insert_options(JSON_OPT_UNSTRUCTURED_KEYS, value);
389        }
390
391        self.json_datatype_options = Some(map);
392    }
393}
394
395/// Partition on columns or values.
396///
397/// - `column_list` is the list of columns in `PARTITION ON COLUMNS` clause.
398/// - `exprs` is the list of expressions in `PARTITION ON VALUES` clause, like
399///   `host <= 'host1'`, `host > 'host1' and host <= 'host2'` or `host > 'host2'`.
400///   Each expression stands for a partition.
401#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
402pub struct Partitions {
403    pub column_list: Vec<Ident>,
404    pub exprs: Vec<Expr>,
405}
406
407impl Partitions {
408    /// set quotes to all [Ident]s from column list
409    pub fn set_quote(&mut self, quote_style: char) {
410        self.column_list
411            .iter_mut()
412            .for_each(|c| c.quote_style = Some(quote_style));
413    }
414}
415
416#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut)]
417pub struct PartitionEntry {
418    pub name: Ident,
419    pub value_list: Vec<SqlValue>,
420}
421
422impl Display for PartitionEntry {
423    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
424        write!(
425            f,
426            "PARTITION {} VALUES LESS THAN ({})",
427            self.name,
428            format_list_comma!(self.value_list),
429        )
430    }
431}
432
433impl Display for Partitions {
434    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
435        if !self.column_list.is_empty() {
436            write!(
437                f,
438                "PARTITION ON COLUMNS ({}) (\n{}\n)",
439                format_list_comma!(self.column_list),
440                format_list_indent!(self.exprs),
441            )?;
442        }
443        Ok(())
444    }
445}
446
447impl Display for CreateTable {
448    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
449        write!(f, "CREATE ")?;
450        if self.engine == FILE_ENGINE {
451            write!(f, "EXTERNAL ")?;
452        }
453        write!(f, "TABLE ")?;
454        if self.if_not_exists {
455            write!(f, "IF NOT EXISTS ")?;
456        }
457        writeln!(f, "{} (", &self.name)?;
458        writeln!(f, "{},", format_list_indent!(self.columns))?;
459        writeln!(f, "{}", format_table_constraint(&self.constraints))?;
460        writeln!(f, ")")?;
461        if let Some(partitions) = &self.partitions {
462            writeln!(f, "{partitions}")?;
463        }
464        writeln!(f, "ENGINE={}", &self.engine)?;
465        if !self.options.is_empty() {
466            let options = self.options.kv_pairs();
467            write!(f, "WITH(\n{}\n)", format_list_indent!(options))?;
468        }
469        Ok(())
470    }
471}
472
473#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
474pub struct CreateDatabase {
475    pub name: ObjectName,
476    /// Create if not exists
477    pub if_not_exists: bool,
478    pub options: OptionMap,
479}
480
481impl CreateDatabase {
482    /// Creates a statement for `CREATE DATABASE`
483    pub fn new(name: ObjectName, if_not_exists: bool, options: OptionMap) -> Self {
484        Self {
485            name,
486            if_not_exists,
487            options,
488        }
489    }
490}
491
492impl Display for CreateDatabase {
493    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
494        write!(f, "CREATE DATABASE ")?;
495        if self.if_not_exists {
496            write!(f, "IF NOT EXISTS ")?;
497        }
498        write!(f, "{}", &self.name)?;
499        if !self.options.is_empty() {
500            let options = self.options.kv_pairs();
501            write!(f, "\nWITH(\n{}\n)", format_list_indent!(options))?;
502        }
503        Ok(())
504    }
505}
506
507#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
508pub struct CreateExternalTable {
509    /// Table name
510    pub name: ObjectName,
511    pub columns: Vec<Column>,
512    pub constraints: Vec<TableConstraint>,
513    /// Table options in `WITH`. All keys are lowercase.
514    pub options: OptionMap,
515    pub if_not_exists: bool,
516    pub engine: String,
517}
518
519impl Display for CreateExternalTable {
520    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
521        write!(f, "CREATE EXTERNAL TABLE ")?;
522        if self.if_not_exists {
523            write!(f, "IF NOT EXISTS ")?;
524        }
525        writeln!(f, "{} (", &self.name)?;
526        writeln!(f, "{},", format_list_indent!(self.columns))?;
527        writeln!(f, "{}", format_table_constraint(&self.constraints))?;
528        writeln!(f, ")")?;
529        writeln!(f, "ENGINE={}", &self.engine)?;
530        if !self.options.is_empty() {
531            let options = self.options.kv_pairs();
532            write!(f, "WITH(\n{}\n)", format_list_indent!(options))?;
533        }
534        Ok(())
535    }
536}
537
538#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
539pub struct CreateTableLike {
540    /// Table name
541    pub table_name: ObjectName,
542    /// The table that is designated to be imitated by `Like`
543    pub source_name: ObjectName,
544}
545
546impl Display for CreateTableLike {
547    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
548        let table_name = &self.table_name;
549        let source_name = &self.source_name;
550        write!(f, r#"CREATE TABLE {table_name} LIKE {source_name}"#)
551    }
552}
553
554#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
555pub struct CreateFlow {
556    /// Flow name
557    pub flow_name: ObjectName,
558    /// Output (sink) table name
559    pub sink_table_name: ObjectName,
560    /// Whether to replace existing task
561    pub or_replace: bool,
562    /// Create if not exist
563    pub if_not_exists: bool,
564    /// `EXPIRE AFTER`
565    /// Duration in second as `i64`
566    pub expire_after: Option<i64>,
567    /// Duration for flow evaluation interval
568    /// Duration in seconds as `i64`
569    /// If not set, flow will be evaluated based on time window size and other args.
570    pub eval_interval: Option<i64>,
571    /// Comment string
572    pub comment: Option<String>,
573    /// SQL statement
574    pub query: Box<SqlOrTql>,
575}
576
577/// Either a sql query or a tql query
578#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
579pub enum SqlOrTql {
580    Sql(Query, String),
581    Tql(Tql, String),
582}
583
584impl std::fmt::Display for SqlOrTql {
585    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
586        match self {
587            Self::Sql(_, s) => write!(f, "{}", s),
588            Self::Tql(_, s) => write!(f, "{}", s),
589        }
590    }
591}
592
593impl SqlOrTql {
594    pub fn try_from_statement(
595        value: Statement,
596        original_query: &str,
597    ) -> std::result::Result<Self, crate::error::Error> {
598        match value {
599            Statement::Query(query) => {
600                Ok(Self::Sql((*query).try_into()?, original_query.to_string()))
601            }
602            Statement::Tql(tql) => Ok(Self::Tql(tql, original_query.to_string())),
603            _ => InvalidFlowQuerySnafu {
604                reason: format!("Expect either sql query or promql query, found {:?}", value),
605            }
606            .fail(),
607        }
608    }
609}
610
611impl Display for CreateFlow {
612    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
613        write!(f, "CREATE ")?;
614        if self.or_replace {
615            write!(f, "OR REPLACE ")?;
616        }
617        write!(f, "FLOW ")?;
618        if self.if_not_exists {
619            write!(f, "IF NOT EXISTS ")?;
620        }
621        writeln!(f, "{}", &self.flow_name)?;
622        writeln!(f, "SINK TO {}", &self.sink_table_name)?;
623        if let Some(expire_after) = &self.expire_after {
624            writeln!(f, "EXPIRE AFTER '{} s'", expire_after)?;
625        }
626        if let Some(eval_interval) = &self.eval_interval {
627            writeln!(f, "EVAL INTERVAL '{} s'", eval_interval)?;
628        }
629        if let Some(comment) = &self.comment {
630            writeln!(f, "COMMENT '{}'", comment)?;
631        }
632        write!(f, "AS {}", &self.query)
633    }
634}
635
636/// Create SQL view statement.
637#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
638pub struct CreateView {
639    /// View name
640    pub name: ObjectName,
641    /// An optional list of names to be used for columns of the view
642    pub columns: Vec<Ident>,
643    /// The clause after `As` that defines the VIEW.
644    /// Can only be either [Statement::Query] or [Statement::Tql].
645    pub query: Box<Statement>,
646    /// Whether to replace existing VIEW
647    pub or_replace: bool,
648    /// Create VIEW only when it doesn't exists
649    pub if_not_exists: bool,
650}
651
652impl Display for CreateView {
653    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
654        write!(f, "CREATE ")?;
655        if self.or_replace {
656            write!(f, "OR REPLACE ")?;
657        }
658        write!(f, "VIEW ")?;
659        if self.if_not_exists {
660            write!(f, "IF NOT EXISTS ")?;
661        }
662        write!(f, "{} ", &self.name)?;
663        if !self.columns.is_empty() {
664            write!(f, "({}) ", format_list_comma!(self.columns))?;
665        }
666        write!(f, "AS {}", &self.query)
667    }
668}
669
670#[cfg(test)]
671mod tests {
672    use std::assert_matches::assert_matches;
673
674    use crate::dialect::GreptimeDbDialect;
675    use crate::error::Error;
676    use crate::parser::{ParseOptions, ParserContext};
677    use crate::statements::statement::Statement;
678
679    #[test]
680    fn test_display_create_table() {
681        let sql = r"create table if not exists demo(
682                             host string,
683                             ts timestamp,
684                             cpu double default 0,
685                             memory double,
686                             TIME INDEX (ts),
687                             PRIMARY KEY(host)
688                       )
689                       PARTITION ON COLUMNS (host) (
690                            host = 'a',
691                            host > 'a',
692                       )
693                       engine=mito
694                       with(ttl='7d', storage='File');
695         ";
696        let result =
697            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
698                .unwrap();
699        assert_eq!(1, result.len());
700
701        match &result[0] {
702            Statement::CreateTable(c) => {
703                let new_sql = format!("\n{}", c);
704                assert_eq!(
705                    r#"
706CREATE TABLE IF NOT EXISTS demo (
707  host STRING,
708  ts TIMESTAMP,
709  cpu DOUBLE DEFAULT 0,
710  memory DOUBLE,
711  TIME INDEX (ts),
712  PRIMARY KEY (host)
713)
714PARTITION ON COLUMNS (host) (
715  host = 'a',
716  host > 'a'
717)
718ENGINE=mito
719WITH(
720  storage = 'File',
721  ttl = '7d'
722)"#,
723                    &new_sql
724                );
725
726                let new_result = ParserContext::create_with_dialect(
727                    &new_sql,
728                    &GreptimeDbDialect {},
729                    ParseOptions::default(),
730                )
731                .unwrap();
732                assert_eq!(result, new_result);
733            }
734            _ => unreachable!(),
735        }
736    }
737
738    #[test]
739    fn test_display_empty_partition_column() {
740        let sql = r"create table if not exists demo(
741            host string,
742            ts timestamp,
743            cpu double default 0,
744            memory double,
745            TIME INDEX (ts),
746            PRIMARY KEY(ts, host)
747            );
748        ";
749        let result =
750            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
751                .unwrap();
752        assert_eq!(1, result.len());
753
754        match &result[0] {
755            Statement::CreateTable(c) => {
756                let new_sql = format!("\n{}", c);
757                assert_eq!(
758                    r#"
759CREATE TABLE IF NOT EXISTS demo (
760  host STRING,
761  ts TIMESTAMP,
762  cpu DOUBLE DEFAULT 0,
763  memory DOUBLE,
764  TIME INDEX (ts),
765  PRIMARY KEY (ts, host)
766)
767ENGINE=mito
768"#,
769                    &new_sql
770                );
771
772                let new_result = ParserContext::create_with_dialect(
773                    &new_sql,
774                    &GreptimeDbDialect {},
775                    ParseOptions::default(),
776                )
777                .unwrap();
778                assert_eq!(result, new_result);
779            }
780            _ => unreachable!(),
781        }
782    }
783
784    #[test]
785    fn test_validate_table_options() {
786        let sql = r"create table if not exists demo(
787            host string,
788            ts timestamp,
789            cpu double default 0,
790            memory double,
791            TIME INDEX (ts),
792            PRIMARY KEY(host)
793      )
794      PARTITION ON COLUMNS (host) ()
795      engine=mito
796      with(ttl='7d', 'compaction.type'='world');
797";
798        let result =
799            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
800                .unwrap();
801        match &result[0] {
802            Statement::CreateTable(c) => {
803                assert_eq!(2, c.options.len());
804            }
805            _ => unreachable!(),
806        }
807
808        let sql = r"create table if not exists demo(
809            host string,
810            ts timestamp,
811            cpu double default 0,
812            memory double,
813            TIME INDEX (ts),
814            PRIMARY KEY(host)
815      )
816      PARTITION ON COLUMNS (host) ()
817      engine=mito
818      with(ttl='7d', hello='world');
819";
820        let result =
821            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
822        assert_matches!(result, Err(Error::InvalidTableOption { .. }))
823    }
824
825    #[test]
826    fn test_display_create_database() {
827        let sql = r"create database test;";
828        let stmts =
829            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
830                .unwrap();
831        assert_eq!(1, stmts.len());
832        assert_matches!(&stmts[0], Statement::CreateDatabase { .. });
833
834        match &stmts[0] {
835            Statement::CreateDatabase(set) => {
836                let new_sql = format!("\n{}", set);
837                assert_eq!(
838                    r#"
839CREATE DATABASE test"#,
840                    &new_sql
841                );
842            }
843            _ => {
844                unreachable!();
845            }
846        }
847
848        let sql = r"create database if not exists test;";
849        let stmts =
850            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
851                .unwrap();
852        assert_eq!(1, stmts.len());
853        assert_matches!(&stmts[0], Statement::CreateDatabase { .. });
854
855        match &stmts[0] {
856            Statement::CreateDatabase(set) => {
857                let new_sql = format!("\n{}", set);
858                assert_eq!(
859                    r#"
860CREATE DATABASE IF NOT EXISTS test"#,
861                    &new_sql
862                );
863            }
864            _ => {
865                unreachable!();
866            }
867        }
868
869        let sql = r#"CREATE DATABASE IF NOT EXISTS test WITH (ttl='1h');"#;
870        let stmts =
871            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
872                .unwrap();
873        assert_eq!(1, stmts.len());
874        assert_matches!(&stmts[0], Statement::CreateDatabase { .. });
875
876        match &stmts[0] {
877            Statement::CreateDatabase(set) => {
878                let new_sql = format!("\n{}", set);
879                assert_eq!(
880                    r#"
881CREATE DATABASE IF NOT EXISTS test
882WITH(
883  ttl = '1h'
884)"#,
885                    &new_sql
886                );
887            }
888            _ => {
889                unreachable!();
890            }
891        }
892    }
893
894    #[test]
895    fn test_display_create_table_like() {
896        let sql = r"create table t2 like t1;";
897        let stmts =
898            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
899                .unwrap();
900        assert_eq!(1, stmts.len());
901        assert_matches!(&stmts[0], Statement::CreateTableLike { .. });
902
903        match &stmts[0] {
904            Statement::CreateTableLike(create) => {
905                let new_sql = format!("\n{}", create);
906                assert_eq!(
907                    r#"
908CREATE TABLE t2 LIKE t1"#,
909                    &new_sql
910                );
911            }
912            _ => {
913                unreachable!();
914            }
915        }
916    }
917
918    #[test]
919    fn test_display_create_external_table() {
920        let sql = r#"CREATE EXTERNAL TABLE city (
921            host string,
922            ts timestamp,
923            cpu float64 default 0,
924            memory float64,
925            TIME INDEX (ts),
926            PRIMARY KEY(host)
927) WITH (location='/var/data/city.csv', format='csv');"#;
928        let stmts =
929            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
930                .unwrap();
931        assert_eq!(1, stmts.len());
932        assert_matches!(&stmts[0], Statement::CreateExternalTable { .. });
933
934        match &stmts[0] {
935            Statement::CreateExternalTable(create) => {
936                let new_sql = format!("\n{}", create);
937                assert_eq!(
938                    r#"
939CREATE EXTERNAL TABLE city (
940  host STRING,
941  ts TIMESTAMP,
942  cpu DOUBLE DEFAULT 0,
943  memory DOUBLE,
944  TIME INDEX (ts),
945  PRIMARY KEY (host)
946)
947ENGINE=file
948WITH(
949  format = 'csv',
950  location = '/var/data/city.csv'
951)"#,
952                    &new_sql
953                );
954            }
955            _ => {
956                unreachable!();
957            }
958        }
959    }
960
961    #[test]
962    fn test_display_create_flow() {
963        let sql = r"CREATE FLOW filter_numbers
964            SINK TO out_num_cnt
965            AS SELECT number FROM numbers_input where number > 10;";
966        let result =
967            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
968                .unwrap();
969        assert_eq!(1, result.len());
970
971        match &result[0] {
972            Statement::CreateFlow(c) => {
973                let new_sql = format!("\n{}", c);
974                assert_eq!(
975                    r#"
976CREATE FLOW filter_numbers
977SINK TO out_num_cnt
978AS SELECT number FROM numbers_input where number > 10"#,
979                    &new_sql
980                );
981
982                let new_result = ParserContext::create_with_dialect(
983                    &new_sql,
984                    &GreptimeDbDialect {},
985                    ParseOptions::default(),
986                )
987                .unwrap();
988                assert_eq!(result, new_result);
989            }
990            _ => unreachable!(),
991        }
992    }
993
994    #[test]
995    fn test_vector_index_options_validation() {
996        use super::{ColumnExtensions, OptionMap};
997
998        // Test zero connectivity should fail
999        let extensions = ColumnExtensions {
1000            fulltext_index_options: None,
1001            vector_options: None,
1002            skipping_index_options: None,
1003            inverted_index_options: None,
1004            json_datatype_options: None,
1005            vector_index_options: Some(OptionMap::from([(
1006                "connectivity".to_string(),
1007                "0".to_string(),
1008            )])),
1009        };
1010        let result = extensions.build_vector_index_options();
1011        assert!(result.is_err());
1012        assert!(
1013            result
1014                .unwrap_err()
1015                .to_string()
1016                .contains("connectivity must be in the range [2, 2048]")
1017        );
1018
1019        // Test zero expansion_add should fail
1020        let extensions = ColumnExtensions {
1021            fulltext_index_options: None,
1022            vector_options: None,
1023            skipping_index_options: None,
1024            inverted_index_options: None,
1025            json_datatype_options: None,
1026            vector_index_options: Some(OptionMap::from([(
1027                "expansion_add".to_string(),
1028                "0".to_string(),
1029            )])),
1030        };
1031        let result = extensions.build_vector_index_options();
1032        assert!(result.is_err());
1033        assert!(
1034            result
1035                .unwrap_err()
1036                .to_string()
1037                .contains("expansion_add must be greater than 0")
1038        );
1039
1040        // Test zero expansion_search should fail
1041        let extensions = ColumnExtensions {
1042            fulltext_index_options: None,
1043            vector_options: None,
1044            skipping_index_options: None,
1045            inverted_index_options: None,
1046            json_datatype_options: None,
1047            vector_index_options: Some(OptionMap::from([(
1048                "expansion_search".to_string(),
1049                "0".to_string(),
1050            )])),
1051        };
1052        let result = extensions.build_vector_index_options();
1053        assert!(result.is_err());
1054        assert!(
1055            result
1056                .unwrap_err()
1057                .to_string()
1058                .contains("expansion_search must be greater than 0")
1059        );
1060
1061        // Test valid values should succeed
1062        let extensions = ColumnExtensions {
1063            fulltext_index_options: None,
1064            vector_options: None,
1065            skipping_index_options: None,
1066            inverted_index_options: None,
1067            json_datatype_options: None,
1068            vector_index_options: Some(OptionMap::from([
1069                ("connectivity".to_string(), "32".to_string()),
1070                ("expansion_add".to_string(), "200".to_string()),
1071                ("expansion_search".to_string(), "100".to_string()),
1072            ])),
1073        };
1074        let result = extensions.build_vector_index_options();
1075        assert!(result.is_ok());
1076        let options = result.unwrap().unwrap();
1077        assert_eq!(options.connectivity, 32);
1078        assert_eq!(options.expansion_add, 200);
1079        assert_eq!(options.expansion_search, 100);
1080    }
1081}