1#[cfg(feature = "enterprise")]
16pub mod trigger;
17
18use std::collections::{HashMap, HashSet};
19
20use api::helper::ColumnDataTypeWrapper;
21use api::v1::alter_database_expr::Kind as AlterDatabaseKind;
22use api::v1::alter_table_expr::Kind as AlterTableKind;
23use api::v1::column_def::{options_from_column_schema, try_as_column_schema};
24use api::v1::{
25 AddColumn, AddColumns, AlterDatabaseExpr, AlterTableExpr, Analyzer, ColumnDataType,
26 ColumnDataTypeExtension, CreateFlowExpr, CreateTableExpr, CreateViewExpr, DropColumn,
27 DropColumns, DropDefaults, ExpireAfter, FulltextBackend as PbFulltextBackend, ModifyColumnType,
28 ModifyColumnTypes, RenameTable, SemanticType, SetDatabaseOptions, SetDefaults, SetFulltext,
29 SetIndex, SetIndexes, SetInverted, SetSkipping, SetTableOptions,
30 SkippingIndexType as PbSkippingIndexType, TableName, UnsetDatabaseOptions, UnsetFulltext,
31 UnsetIndex, UnsetIndexes, UnsetInverted, UnsetSkipping, UnsetTableOptions, set_index,
32 unset_index,
33};
34use common_error::ext::BoxedError;
35use common_grpc_expr::util::ColumnExpr;
36use common_time::Timezone;
37use datafusion::sql::planner::object_name_to_table_reference;
38use datatypes::schema::{
39 COLUMN_FULLTEXT_OPT_KEY_ANALYZER, COLUMN_FULLTEXT_OPT_KEY_BACKEND,
40 COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE,
41 COLUMN_FULLTEXT_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE,
42 COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY,
43 ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema,
44 SkippingIndexType,
45};
46use file_engine::FileOptions;
47use query::sql::{
48 check_file_to_table_schema_compatibility, file_column_schemas_to_table,
49 infer_file_table_schema, prepare_file_table_files,
50};
51use session::context::QueryContextRef;
52use session::table_name::table_idents_to_full_name;
53use snafu::{OptionExt, ResultExt, ensure};
54use sql::ast::{
55 ColumnDef, ColumnOption, ColumnOptionDef, Expr, Ident, ObjectName, ObjectNamePartExt,
56};
57use sql::dialect::GreptimeDbDialect;
58use sql::parser::ParserContext;
59use sql::statements::alter::{
60 AlterDatabase, AlterDatabaseOperation, AlterTable, AlterTableOperation,
61};
62use sql::statements::create::{
63 Column as SqlColumn, ColumnExtensions, CreateExternalTable, CreateFlow, CreateTable,
64 CreateView, TableConstraint,
65};
66use sql::statements::{
67 OptionMap, column_to_schema, concrete_data_type_to_sql_data_type,
68 sql_column_def_to_grpc_column_def, sql_data_type_to_concrete_data_type, value_to_sql_value,
69};
70use sql::util::extract_tables_from_query;
71use store_api::mito_engine_options::{COMPACTION_OVERRIDE, COMPACTION_TYPE};
72use table::requests::{FILE_TABLE_META_KEY, TableOptions};
73use table::table_reference::TableReference;
74#[cfg(feature = "enterprise")]
75pub use trigger::to_create_trigger_task_expr;
76
77use crate::error::{
78 BuildCreateExprOnInsertionSnafu, ColumnDataTypeSnafu, ConvertColumnDefaultConstraintSnafu,
79 ConvertIdentifierSnafu, EncodeJsonSnafu, ExternalSnafu, FindNewColumnsOnInsertionSnafu,
80 IllegalPrimaryKeysDefSnafu, InferFileTableSchemaSnafu, InvalidColumnDefSnafu,
81 InvalidFlowNameSnafu, InvalidSqlSnafu, NotSupportedSnafu, ParseSqlSnafu, ParseSqlValueSnafu,
82 PrepareFileTableSnafu, Result, SchemaIncompatibleSnafu, UnrecognizedTableOptionSnafu,
83};
84
85pub fn create_table_expr_by_column_schemas(
86 table_name: &TableReference<'_>,
87 column_schemas: &[api::v1::ColumnSchema],
88 engine: &str,
89 desc: Option<&str>,
90) -> Result<CreateTableExpr> {
91 let column_exprs = ColumnExpr::from_column_schemas(column_schemas);
92 let expr = common_grpc_expr::util::build_create_table_expr(
93 None,
94 table_name,
95 column_exprs,
96 engine,
97 desc.unwrap_or("Created on insertion"),
98 )
99 .context(BuildCreateExprOnInsertionSnafu)?;
100
101 validate_create_expr(&expr)?;
102 Ok(expr)
103}
104
105pub fn extract_add_columns_expr(
106 schema: &Schema,
107 column_exprs: Vec<ColumnExpr>,
108) -> Result<Option<AddColumns>> {
109 let add_columns = common_grpc_expr::util::extract_new_columns(schema, column_exprs)
110 .context(FindNewColumnsOnInsertionSnafu)?;
111 if let Some(add_columns) = &add_columns {
112 validate_add_columns_expr(add_columns)?;
113 }
114 Ok(add_columns)
115}
116
117pub(crate) async fn create_external_expr(
141 create: CreateExternalTable,
142 query_ctx: &QueryContextRef,
143) -> Result<CreateTableExpr> {
144 let (catalog_name, schema_name, table_name) =
145 table_idents_to_full_name(&create.name, query_ctx)
146 .map_err(BoxedError::new)
147 .context(ExternalSnafu)?;
148
149 let mut table_options = create.options.into_map();
150
151 let (object_store, files) = prepare_file_table_files(&table_options)
152 .await
153 .context(PrepareFileTableSnafu)?;
154
155 let file_column_schemas = infer_file_table_schema(&object_store, &files, &table_options)
156 .await
157 .context(InferFileTableSchemaSnafu)?
158 .column_schemas;
159
160 let (time_index, primary_keys, table_column_schemas) = if !create.columns.is_empty() {
161 let time_index = find_time_index(&create.constraints)?;
163 let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
164 let column_schemas =
165 columns_to_column_schemas(&create.columns, &time_index, Some(&query_ctx.timezone()))?;
166 (time_index, primary_keys, column_schemas)
167 } else {
168 let (column_schemas, time_index) = file_column_schemas_to_table(&file_column_schemas);
170 let primary_keys = vec![];
171 (time_index, primary_keys, column_schemas)
172 };
173
174 check_file_to_table_schema_compatibility(&file_column_schemas, &table_column_schemas)
175 .context(SchemaIncompatibleSnafu)?;
176
177 let meta = FileOptions {
178 files,
179 file_column_schemas,
180 };
181 table_options.insert(
182 FILE_TABLE_META_KEY.to_string(),
183 serde_json::to_string(&meta).context(EncodeJsonSnafu)?,
184 );
185
186 let column_defs = column_schemas_to_defs(table_column_schemas, &primary_keys)?;
187 let expr = CreateTableExpr {
188 catalog_name,
189 schema_name,
190 table_name,
191 desc: String::default(),
192 column_defs,
193 time_index,
194 primary_keys,
195 create_if_not_exists: create.if_not_exists,
196 table_options,
197 table_id: None,
198 engine: create.engine.clone(),
199 };
200
201 Ok(expr)
202}
203
204pub fn create_to_expr(
206 create: &CreateTable,
207 query_ctx: &QueryContextRef,
208) -> Result<CreateTableExpr> {
209 let (catalog_name, schema_name, table_name) =
210 table_idents_to_full_name(&create.name, query_ctx)
211 .map_err(BoxedError::new)
212 .context(ExternalSnafu)?;
213
214 let time_index = find_time_index(&create.constraints)?;
215 let table_options = HashMap::from(
216 &TableOptions::try_from_iter(create.options.to_str_map())
217 .context(UnrecognizedTableOptionSnafu)?,
218 );
219
220 let mut table_options = table_options;
221 if table_options.contains_key(COMPACTION_TYPE) {
222 table_options.insert(COMPACTION_OVERRIDE.to_string(), "true".to_string());
223 }
224
225 let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
226
227 let expr = CreateTableExpr {
228 catalog_name,
229 schema_name,
230 table_name,
231 desc: String::default(),
232 column_defs: columns_to_expr(
233 &create.columns,
234 &time_index,
235 &primary_keys,
236 Some(&query_ctx.timezone()),
237 )?,
238 time_index,
239 primary_keys,
240 create_if_not_exists: create.if_not_exists,
241 table_options,
242 table_id: None,
243 engine: create.engine.clone(),
244 };
245
246 validate_create_expr(&expr)?;
247 Ok(expr)
248}
249
250pub fn expr_to_create(expr: &CreateTableExpr, quote_style: Option<char>) -> Result<CreateTable> {
258 let quote_style = quote_style.unwrap_or('`');
259
260 let table_name = ObjectName(vec![sql::ast::ObjectNamePart::Identifier(
262 sql::ast::Ident::with_quote(quote_style, &expr.table_name),
263 )]);
264
265 let mut columns = Vec::with_capacity(expr.column_defs.len());
267 for column_def in &expr.column_defs {
268 let column_schema = try_as_column_schema(column_def).context(InvalidColumnDefSnafu {
269 column: &column_def.name,
270 })?;
271
272 let mut options = Vec::new();
273
274 if column_def.is_nullable {
276 options.push(ColumnOptionDef {
277 name: None,
278 option: ColumnOption::Null,
279 });
280 } else {
281 options.push(ColumnOptionDef {
282 name: None,
283 option: ColumnOption::NotNull,
284 });
285 }
286
287 if let Some(default_constraint) = column_schema.default_constraint() {
289 let expr = match default_constraint {
290 ColumnDefaultConstraint::Value(v) => {
291 Expr::Value(value_to_sql_value(v).context(ParseSqlValueSnafu)?.into())
292 }
293 ColumnDefaultConstraint::Function(func_expr) => {
294 ParserContext::parse_function(func_expr, &GreptimeDbDialect {})
295 .context(ParseSqlSnafu)?
296 }
297 };
298 options.push(ColumnOptionDef {
299 name: None,
300 option: ColumnOption::Default(expr),
301 });
302 }
303
304 if !column_def.comment.is_empty() {
306 options.push(ColumnOptionDef {
307 name: None,
308 option: ColumnOption::Comment(column_def.comment.clone()),
309 });
310 }
311
312 let mut extensions = ColumnExtensions::default();
317
318 if let Ok(Some(opt)) = column_schema.fulltext_options()
320 && opt.enable
321 {
322 let mut map = HashMap::from([
323 (
324 COLUMN_FULLTEXT_OPT_KEY_ANALYZER.to_string(),
325 opt.analyzer.to_string(),
326 ),
327 (
328 COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE.to_string(),
329 opt.case_sensitive.to_string(),
330 ),
331 (
332 COLUMN_FULLTEXT_OPT_KEY_BACKEND.to_string(),
333 opt.backend.to_string(),
334 ),
335 ]);
336 if opt.backend == FulltextBackend::Bloom {
337 map.insert(
338 COLUMN_FULLTEXT_OPT_KEY_GRANULARITY.to_string(),
339 opt.granularity.to_string(),
340 );
341 map.insert(
342 COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
343 opt.false_positive_rate().to_string(),
344 );
345 }
346 extensions.fulltext_index_options = Some(map.into());
347 }
348
349 if let Ok(Some(opt)) = column_schema.skipping_index_options() {
351 let map = HashMap::from([
352 (
353 COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY.to_string(),
354 opt.granularity.to_string(),
355 ),
356 (
357 COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
358 opt.false_positive_rate().to_string(),
359 ),
360 (
361 COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE.to_string(),
362 opt.index_type.to_string(),
363 ),
364 ]);
365 extensions.skipping_index_options = Some(map.into());
366 }
367
368 if column_schema.is_inverted_indexed() {
370 extensions.inverted_index_options = Some(HashMap::new().into());
371 }
372
373 let sql_column = SqlColumn {
374 column_def: ColumnDef {
375 name: Ident::with_quote(quote_style, &column_def.name),
376 data_type: concrete_data_type_to_sql_data_type(&column_schema.data_type)
377 .context(ParseSqlSnafu)?,
378 options,
379 },
380 extensions,
381 };
382
383 columns.push(sql_column);
384 }
385
386 let mut constraints = Vec::new();
388
389 constraints.push(TableConstraint::TimeIndex {
391 column: Ident::with_quote(quote_style, &expr.time_index),
392 });
393
394 if !expr.primary_keys.is_empty() {
396 let primary_key_columns: Vec<Ident> = expr
397 .primary_keys
398 .iter()
399 .map(|pk| Ident::with_quote(quote_style, pk))
400 .collect();
401
402 constraints.push(TableConstraint::PrimaryKey {
403 columns: primary_key_columns,
404 });
405 }
406
407 let mut options = OptionMap::default();
409 for (key, value) in &expr.table_options {
410 options.insert(key.clone(), value.clone());
411 }
412
413 Ok(CreateTable {
414 if_not_exists: expr.create_if_not_exists,
415 table_id: expr.table_id.as_ref().map(|tid| tid.id).unwrap_or(0),
416 name: table_name,
417 columns,
418 engine: expr.engine.clone(),
419 constraints,
420 options,
421 partitions: None,
422 })
423}
424
425pub fn validate_create_expr(create: &CreateTableExpr) -> Result<()> {
427 let mut column_to_indices = HashMap::with_capacity(create.column_defs.len());
429 for (idx, column) in create.column_defs.iter().enumerate() {
430 if let Some(indices) = column_to_indices.get(&column.name) {
431 return InvalidSqlSnafu {
432 err_msg: format!(
433 "column name `{}` is duplicated at index {} and {}",
434 column.name, indices, idx
435 ),
436 }
437 .fail();
438 }
439 column_to_indices.insert(&column.name, idx);
440 }
441
442 let _ = column_to_indices
444 .get(&create.time_index)
445 .with_context(|| InvalidSqlSnafu {
446 err_msg: format!(
447 "column name `{}` is not found in column list",
448 create.time_index
449 ),
450 })?;
451
452 for pk in &create.primary_keys {
454 let _ = column_to_indices
455 .get(&pk)
456 .with_context(|| InvalidSqlSnafu {
457 err_msg: format!("column name `{}` is not found in column list", pk),
458 })?;
459 }
460
461 let mut pk_set = HashSet::new();
463 for pk in &create.primary_keys {
464 if !pk_set.insert(pk) {
465 return InvalidSqlSnafu {
466 err_msg: format!("column name `{}` is duplicated in primary keys", pk),
467 }
468 .fail();
469 }
470 }
471
472 if pk_set.contains(&create.time_index) {
474 return InvalidSqlSnafu {
475 err_msg: format!(
476 "column name `{}` is both primary key and time index",
477 create.time_index
478 ),
479 }
480 .fail();
481 }
482
483 for column in &create.column_defs {
484 if is_interval_type(&column.data_type()) {
486 return InvalidSqlSnafu {
487 err_msg: format!(
488 "column name `{}` is interval type, which is not supported",
489 column.name
490 ),
491 }
492 .fail();
493 }
494 if is_date_time_type(&column.data_type()) {
496 return InvalidSqlSnafu {
497 err_msg: format!(
498 "column name `{}` is datetime type, which is not supported, please use `timestamp` type instead",
499 column.name
500 ),
501 }
502 .fail();
503 }
504 }
505 Ok(())
506}
507
508fn validate_add_columns_expr(add_columns: &AddColumns) -> Result<()> {
509 for add_column in &add_columns.add_columns {
510 let Some(column_def) = &add_column.column_def else {
511 continue;
512 };
513 if is_date_time_type(&column_def.data_type()) {
514 return InvalidSqlSnafu {
515 err_msg: format!("column name `{}` is datetime type, which is not supported, please use `timestamp` type instead", column_def.name),
516 }
517 .fail();
518 }
519 if is_interval_type(&column_def.data_type()) {
520 return InvalidSqlSnafu {
521 err_msg: format!(
522 "column name `{}` is interval type, which is not supported",
523 column_def.name
524 ),
525 }
526 .fail();
527 }
528 }
529 Ok(())
530}
531
532fn is_date_time_type(data_type: &ColumnDataType) -> bool {
533 matches!(data_type, ColumnDataType::Datetime)
534}
535
536fn is_interval_type(data_type: &ColumnDataType) -> bool {
537 matches!(
538 data_type,
539 ColumnDataType::IntervalYearMonth
540 | ColumnDataType::IntervalDayTime
541 | ColumnDataType::IntervalMonthDayNano
542 )
543}
544
545fn find_primary_keys(
546 columns: &[SqlColumn],
547 constraints: &[TableConstraint],
548) -> Result<Vec<String>> {
549 let columns_pk = columns
550 .iter()
551 .filter_map(|x| {
552 if x.options().iter().any(|o| {
553 matches!(
554 o.option,
555 ColumnOption::Unique {
556 is_primary: true,
557 ..
558 }
559 )
560 }) {
561 Some(x.name().value.clone())
562 } else {
563 None
564 }
565 })
566 .collect::<Vec<String>>();
567
568 ensure!(
569 columns_pk.len() <= 1,
570 IllegalPrimaryKeysDefSnafu {
571 msg: "not allowed to inline multiple primary keys in columns options"
572 }
573 );
574
575 let constraints_pk = constraints
576 .iter()
577 .filter_map(|constraint| match constraint {
578 TableConstraint::PrimaryKey { columns, .. } => {
579 Some(columns.iter().map(|ident| ident.value.clone()))
580 }
581 _ => None,
582 })
583 .flatten()
584 .collect::<Vec<String>>();
585
586 ensure!(
587 columns_pk.is_empty() || constraints_pk.is_empty(),
588 IllegalPrimaryKeysDefSnafu {
589 msg: "found definitions of primary keys in multiple places"
590 }
591 );
592
593 let mut primary_keys = Vec::with_capacity(columns_pk.len() + constraints_pk.len());
594 primary_keys.extend(columns_pk);
595 primary_keys.extend(constraints_pk);
596 Ok(primary_keys)
597}
598
599pub fn find_time_index(constraints: &[TableConstraint]) -> Result<String> {
600 let time_index = constraints
601 .iter()
602 .filter_map(|constraint| match constraint {
603 TableConstraint::TimeIndex { column, .. } => Some(&column.value),
604 _ => None,
605 })
606 .collect::<Vec<&String>>();
607 ensure!(
608 time_index.len() == 1,
609 InvalidSqlSnafu {
610 err_msg: "must have one and only one TimeIndex columns",
611 }
612 );
613 Ok(time_index[0].clone())
614}
615
616fn columns_to_expr(
617 column_defs: &[SqlColumn],
618 time_index: &str,
619 primary_keys: &[String],
620 timezone: Option<&Timezone>,
621) -> Result<Vec<api::v1::ColumnDef>> {
622 let column_schemas = columns_to_column_schemas(column_defs, time_index, timezone)?;
623 column_schemas_to_defs(column_schemas, primary_keys)
624}
625
626fn columns_to_column_schemas(
627 columns: &[SqlColumn],
628 time_index: &str,
629 timezone: Option<&Timezone>,
630) -> Result<Vec<ColumnSchema>> {
631 columns
632 .iter()
633 .map(|c| column_to_schema(c, time_index, timezone).context(ParseSqlSnafu))
634 .collect::<Result<Vec<ColumnSchema>>>()
635}
636
637pub fn column_schemas_to_defs(
639 column_schemas: Vec<ColumnSchema>,
640 primary_keys: &[String],
641) -> Result<Vec<api::v1::ColumnDef>> {
642 let column_datatypes: Vec<(ColumnDataType, Option<ColumnDataTypeExtension>)> = column_schemas
643 .iter()
644 .map(|c| {
645 ColumnDataTypeWrapper::try_from(c.data_type.clone())
646 .map(|w| w.to_parts())
647 .context(ColumnDataTypeSnafu)
648 })
649 .collect::<Result<Vec<_>>>()?;
650
651 column_schemas
652 .iter()
653 .zip(column_datatypes)
654 .map(|(schema, datatype)| {
655 let semantic_type = if schema.is_time_index() {
656 SemanticType::Timestamp
657 } else if primary_keys.contains(&schema.name) {
658 SemanticType::Tag
659 } else {
660 SemanticType::Field
661 } as i32;
662 let comment = schema
663 .metadata()
664 .get(COMMENT_KEY)
665 .cloned()
666 .unwrap_or_default();
667
668 Ok(api::v1::ColumnDef {
669 name: schema.name.clone(),
670 data_type: datatype.0 as i32,
671 is_nullable: schema.is_nullable(),
672 default_constraint: match schema.default_constraint() {
673 None => vec![],
674 Some(v) => {
675 v.clone()
676 .try_into()
677 .context(ConvertColumnDefaultConstraintSnafu {
678 column_name: &schema.name,
679 })?
680 }
681 },
682 semantic_type,
683 comment,
684 datatype_extension: datatype.1,
685 options: options_from_column_schema(schema),
686 })
687 })
688 .collect()
689}
690
691#[derive(Debug, Clone, PartialEq, Eq)]
692pub struct RepartitionRequest {
693 pub catalog_name: String,
694 pub schema_name: String,
695 pub table_name: String,
696 pub from_exprs: Vec<Expr>,
697 pub into_exprs: Vec<Expr>,
698}
699
700pub(crate) fn to_repartition_request(
701 alter_table: AlterTable,
702 query_ctx: &QueryContextRef,
703) -> Result<RepartitionRequest> {
704 let (catalog_name, schema_name, table_name) =
705 table_idents_to_full_name(alter_table.table_name(), query_ctx)
706 .map_err(BoxedError::new)
707 .context(ExternalSnafu)?;
708
709 let AlterTableOperation::Repartition { operation } = alter_table.alter_operation else {
710 return InvalidSqlSnafu {
711 err_msg: "expected REPARTITION operation",
712 }
713 .fail();
714 };
715
716 Ok(RepartitionRequest {
717 catalog_name,
718 schema_name,
719 table_name,
720 from_exprs: operation.from_exprs,
721 into_exprs: operation.into_exprs,
722 })
723}
724
725pub(crate) fn to_alter_table_expr(
727 alter_table: AlterTable,
728 query_ctx: &QueryContextRef,
729) -> Result<AlterTableExpr> {
730 let (catalog_name, schema_name, table_name) =
731 table_idents_to_full_name(alter_table.table_name(), query_ctx)
732 .map_err(BoxedError::new)
733 .context(ExternalSnafu)?;
734
735 let kind = match alter_table.alter_operation {
736 AlterTableOperation::AddConstraint(_) => {
737 return NotSupportedSnafu {
738 feat: "ADD CONSTRAINT",
739 }
740 .fail();
741 }
742 AlterTableOperation::AddColumns { add_columns } => AlterTableKind::AddColumns(AddColumns {
743 add_columns: add_columns
744 .into_iter()
745 .map(|add_column| {
746 let column_def = sql_column_def_to_grpc_column_def(
747 &add_column.column_def,
748 Some(&query_ctx.timezone()),
749 )
750 .map_err(BoxedError::new)
751 .context(ExternalSnafu)?;
752 if is_interval_type(&column_def.data_type()) {
753 return NotSupportedSnafu {
754 feat: "Add column with interval type",
755 }
756 .fail();
757 }
758 Ok(AddColumn {
759 column_def: Some(column_def),
760 location: add_column.location.as_ref().map(From::from),
761 add_if_not_exists: add_column.add_if_not_exists,
762 })
763 })
764 .collect::<Result<Vec<AddColumn>>>()?,
765 }),
766 AlterTableOperation::ModifyColumnType {
767 column_name,
768 target_type,
769 } => {
770 let target_type =
771 sql_data_type_to_concrete_data_type(&target_type, &Default::default())
772 .context(ParseSqlSnafu)?;
773 let (target_type, target_type_extension) = ColumnDataTypeWrapper::try_from(target_type)
774 .map(|w| w.to_parts())
775 .context(ColumnDataTypeSnafu)?;
776 if is_interval_type(&target_type) {
777 return NotSupportedSnafu {
778 feat: "Modify column type to interval type",
779 }
780 .fail();
781 }
782 AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
783 modify_column_types: vec![ModifyColumnType {
784 column_name: column_name.value,
785 target_type: target_type as i32,
786 target_type_extension,
787 }],
788 })
789 }
790 AlterTableOperation::DropColumn { name } => AlterTableKind::DropColumns(DropColumns {
791 drop_columns: vec![DropColumn {
792 name: name.value.clone(),
793 }],
794 }),
795 AlterTableOperation::RenameTable { new_table_name } => {
796 AlterTableKind::RenameTable(RenameTable {
797 new_table_name: new_table_name.clone(),
798 })
799 }
800 AlterTableOperation::SetTableOptions { options } => {
801 AlterTableKind::SetTableOptions(SetTableOptions {
802 table_options: options.into_iter().map(Into::into).collect(),
803 })
804 }
805 AlterTableOperation::UnsetTableOptions { keys } => {
806 AlterTableKind::UnsetTableOptions(UnsetTableOptions { keys })
807 }
808 AlterTableOperation::Repartition { .. } => {
809 return NotSupportedSnafu {
810 feat: "ALTER TABLE ... REPARTITION",
811 }
812 .fail();
813 }
814 AlterTableOperation::SetIndex { options } => {
815 let option = match options {
816 sql::statements::alter::SetIndexOperation::Fulltext {
817 column_name,
818 options,
819 } => SetIndex {
820 options: Some(set_index::Options::Fulltext(SetFulltext {
821 column_name: column_name.value,
822 enable: options.enable,
823 analyzer: match options.analyzer {
824 FulltextAnalyzer::English => Analyzer::English.into(),
825 FulltextAnalyzer::Chinese => Analyzer::Chinese.into(),
826 },
827 case_sensitive: options.case_sensitive,
828 backend: match options.backend {
829 FulltextBackend::Bloom => PbFulltextBackend::Bloom.into(),
830 FulltextBackend::Tantivy => PbFulltextBackend::Tantivy.into(),
831 },
832 granularity: options.granularity as u64,
833 false_positive_rate: options.false_positive_rate(),
834 })),
835 },
836 sql::statements::alter::SetIndexOperation::Inverted { column_name } => SetIndex {
837 options: Some(set_index::Options::Inverted(SetInverted {
838 column_name: column_name.value,
839 })),
840 },
841 sql::statements::alter::SetIndexOperation::Skipping {
842 column_name,
843 options,
844 } => SetIndex {
845 options: Some(set_index::Options::Skipping(SetSkipping {
846 column_name: column_name.value,
847 enable: true,
848 granularity: options.granularity as u64,
849 false_positive_rate: options.false_positive_rate(),
850 skipping_index_type: match options.index_type {
851 SkippingIndexType::BloomFilter => {
852 PbSkippingIndexType::BloomFilter.into()
853 }
854 },
855 })),
856 },
857 };
858 AlterTableKind::SetIndexes(SetIndexes {
859 set_indexes: vec![option],
860 })
861 }
862 AlterTableOperation::UnsetIndex { options } => {
863 let option = match options {
864 sql::statements::alter::UnsetIndexOperation::Fulltext { column_name } => {
865 UnsetIndex {
866 options: Some(unset_index::Options::Fulltext(UnsetFulltext {
867 column_name: column_name.value,
868 })),
869 }
870 }
871 sql::statements::alter::UnsetIndexOperation::Inverted { column_name } => {
872 UnsetIndex {
873 options: Some(unset_index::Options::Inverted(UnsetInverted {
874 column_name: column_name.value,
875 })),
876 }
877 }
878 sql::statements::alter::UnsetIndexOperation::Skipping { column_name } => {
879 UnsetIndex {
880 options: Some(unset_index::Options::Skipping(UnsetSkipping {
881 column_name: column_name.value,
882 })),
883 }
884 }
885 };
886
887 AlterTableKind::UnsetIndexes(UnsetIndexes {
888 unset_indexes: vec![option],
889 })
890 }
891 AlterTableOperation::DropDefaults { columns } => {
892 AlterTableKind::DropDefaults(DropDefaults {
893 drop_defaults: columns
894 .into_iter()
895 .map(|col| {
896 let column_name = col.0.to_string();
897 Ok(api::v1::DropDefault { column_name })
898 })
899 .collect::<Result<Vec<_>>>()?,
900 })
901 }
902 AlterTableOperation::SetDefaults { defaults } => AlterTableKind::SetDefaults(SetDefaults {
903 set_defaults: defaults
904 .into_iter()
905 .map(|col| {
906 let column_name = col.column_name.to_string();
907 let default_constraint = serde_json::to_string(&col.default_constraint)
908 .context(EncodeJsonSnafu)?
909 .into_bytes();
910 Ok(api::v1::SetDefault {
911 column_name,
912 default_constraint,
913 })
914 })
915 .collect::<Result<Vec<_>>>()?,
916 }),
917 };
918
919 Ok(AlterTableExpr {
920 catalog_name,
921 schema_name,
922 table_name,
923 kind: Some(kind),
924 })
925}
926
927pub fn to_alter_database_expr(
929 alter_database: AlterDatabase,
930 query_ctx: &QueryContextRef,
931) -> Result<AlterDatabaseExpr> {
932 let catalog = query_ctx.current_catalog();
933 let schema = alter_database.database_name;
934
935 let kind = match alter_database.alter_operation {
936 AlterDatabaseOperation::SetDatabaseOption { options } => {
937 let options = options.into_iter().map(Into::into).collect();
938 AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
939 set_database_options: options,
940 })
941 }
942 AlterDatabaseOperation::UnsetDatabaseOption { keys } => {
943 AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys })
944 }
945 };
946
947 Ok(AlterDatabaseExpr {
948 catalog_name: catalog.to_string(),
949 schema_name: schema.to_string(),
950 kind: Some(kind),
951 })
952}
953
954pub fn to_create_view_expr(
956 stmt: CreateView,
957 logical_plan: Vec<u8>,
958 table_names: Vec<TableName>,
959 columns: Vec<String>,
960 plan_columns: Vec<String>,
961 definition: String,
962 query_ctx: QueryContextRef,
963) -> Result<CreateViewExpr> {
964 let (catalog_name, schema_name, view_name) = table_idents_to_full_name(&stmt.name, &query_ctx)
965 .map_err(BoxedError::new)
966 .context(ExternalSnafu)?;
967
968 let expr = CreateViewExpr {
969 catalog_name,
970 schema_name,
971 view_name,
972 logical_plan,
973 create_if_not_exists: stmt.if_not_exists,
974 or_replace: stmt.or_replace,
975 table_names,
976 columns,
977 plan_columns,
978 definition,
979 };
980
981 Ok(expr)
982}
983
984pub fn to_create_flow_task_expr(
985 create_flow: CreateFlow,
986 query_ctx: &QueryContextRef,
987) -> Result<CreateFlowExpr> {
988 let sink_table_ref = object_name_to_table_reference(create_flow.sink_table_name.clone(), true)
990 .with_context(|_| ConvertIdentifierSnafu {
991 ident: create_flow.sink_table_name.to_string(),
992 })?;
993 let catalog = sink_table_ref
994 .catalog()
995 .unwrap_or(query_ctx.current_catalog())
996 .to_string();
997 let schema = sink_table_ref
998 .schema()
999 .map(|s| s.to_owned())
1000 .unwrap_or(query_ctx.current_schema());
1001
1002 let sink_table_name = TableName {
1003 catalog_name: catalog,
1004 schema_name: schema,
1005 table_name: sink_table_ref.table().to_string(),
1006 };
1007
1008 let source_table_names = extract_tables_from_query(&create_flow.query)
1009 .map(|name| {
1010 let reference =
1011 object_name_to_table_reference(name.clone(), true).with_context(|_| {
1012 ConvertIdentifierSnafu {
1013 ident: name.to_string(),
1014 }
1015 })?;
1016 let catalog = reference
1017 .catalog()
1018 .unwrap_or(query_ctx.current_catalog())
1019 .to_string();
1020 let schema = reference
1021 .schema()
1022 .map(|s| s.to_string())
1023 .unwrap_or(query_ctx.current_schema());
1024
1025 let table_name = TableName {
1026 catalog_name: catalog,
1027 schema_name: schema,
1028 table_name: reference.table().to_string(),
1029 };
1030 Ok(table_name)
1031 })
1032 .collect::<Result<Vec<_>>>()?;
1033
1034 let eval_interval = create_flow.eval_interval;
1035
1036 Ok(CreateFlowExpr {
1037 catalog_name: query_ctx.current_catalog().to_string(),
1038 flow_name: sanitize_flow_name(create_flow.flow_name)?,
1039 source_table_names,
1040 sink_table_name: Some(sink_table_name),
1041 or_replace: create_flow.or_replace,
1042 create_if_not_exists: create_flow.if_not_exists,
1043 expire_after: create_flow.expire_after.map(|value| ExpireAfter { value }),
1044 eval_interval: eval_interval.map(|seconds| api::v1::EvalInterval { seconds }),
1045 comment: create_flow.comment.unwrap_or_default(),
1046 sql: create_flow.query.to_string(),
1047 flow_options: Default::default(),
1048 })
1049}
1050
1051fn sanitize_flow_name(mut flow_name: ObjectName) -> Result<String> {
1053 ensure!(
1054 flow_name.0.len() == 1,
1055 InvalidFlowNameSnafu {
1056 name: flow_name.to_string(),
1057 }
1058 );
1059 Ok(flow_name.0.swap_remove(0).to_string_unquoted())
1061}
1062
1063#[cfg(test)]
1064mod tests {
1065 use api::v1::{SetDatabaseOptions, UnsetDatabaseOptions};
1066 use datatypes::value::Value;
1067 use session::context::{QueryContext, QueryContextBuilder};
1068 use sql::dialect::GreptimeDbDialect;
1069 use sql::parser::{ParseOptions, ParserContext};
1070 use sql::statements::statement::Statement;
1071 use store_api::storage::ColumnDefaultConstraint;
1072
1073 use super::*;
1074
1075 #[test]
1076 fn test_create_flow_tql_expr() {
1077 let sql = r#"
1078CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1079TQL EVAL (0, 15, '5s') count_values("status_code", http_requests);"#;
1080 let stmt =
1081 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1082
1083 assert!(
1084 stmt.is_err(),
1085 "Expected error for invalid TQL EVAL parameters: {:#?}",
1086 stmt
1087 );
1088
1089 let sql = r#"
1090CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1091TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests);"#;
1092 let stmt =
1093 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1094 .unwrap()
1095 .pop()
1096 .unwrap();
1097
1098 let Statement::CreateFlow(create_flow) = stmt else {
1099 unreachable!()
1100 };
1101 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1102
1103 let to_dot_sep =
1104 |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1105 assert_eq!("calc_reqs", expr.flow_name);
1106 assert_eq!("greptime", expr.catalog_name);
1107 assert_eq!(
1108 "greptime.public.cnt_reqs",
1109 expr.sink_table_name.map(to_dot_sep).unwrap()
1110 );
1111 assert!(expr.source_table_names.is_empty());
1112 assert_eq!(
1113 r#"TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests)"#,
1114 expr.sql
1115 );
1116 }
1117
1118 #[test]
1119 fn test_create_flow_expr() {
1120 let sql = r"
1121CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
1122SELECT
1123 DISTINCT number as dis
1124FROM
1125 distinct_basic;";
1126 let stmt =
1127 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1128 .unwrap()
1129 .pop()
1130 .unwrap();
1131
1132 let Statement::CreateFlow(create_flow) = stmt else {
1133 unreachable!()
1134 };
1135 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1136
1137 let to_dot_sep =
1138 |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1139 assert_eq!("test_distinct_basic", expr.flow_name);
1140 assert_eq!("greptime", expr.catalog_name);
1141 assert_eq!(
1142 "greptime.public.out_distinct_basic",
1143 expr.sink_table_name.map(to_dot_sep).unwrap()
1144 );
1145 assert_eq!(1, expr.source_table_names.len());
1146 assert_eq!(
1147 "greptime.public.distinct_basic",
1148 to_dot_sep(expr.source_table_names[0].clone())
1149 );
1150 assert_eq!(
1151 r"SELECT
1152 DISTINCT number as dis
1153FROM
1154 distinct_basic",
1155 expr.sql
1156 );
1157
1158 let sql = r"
1159CREATE FLOW `task_2`
1160SINK TO schema_1.table_1
1161AS
1162SELECT max(c1), min(c2) FROM schema_2.table_2;";
1163 let stmt =
1164 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1165 .unwrap()
1166 .pop()
1167 .unwrap();
1168
1169 let Statement::CreateFlow(create_flow) = stmt else {
1170 unreachable!()
1171 };
1172 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1173
1174 let to_dot_sep =
1175 |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1176 assert_eq!("task_2", expr.flow_name);
1177 assert_eq!("greptime", expr.catalog_name);
1178 assert_eq!(
1179 "greptime.schema_1.table_1",
1180 expr.sink_table_name.map(to_dot_sep).unwrap()
1181 );
1182 assert_eq!(1, expr.source_table_names.len());
1183 assert_eq!(
1184 "greptime.schema_2.table_2",
1185 to_dot_sep(expr.source_table_names[0].clone())
1186 );
1187 assert_eq!("SELECT max(c1), min(c2) FROM schema_2.table_2", expr.sql);
1188
1189 let sql = r"
1190CREATE FLOW abc.`task_2`
1191SINK TO schema_1.table_1
1192AS
1193SELECT max(c1), min(c2) FROM schema_2.table_2;";
1194 let stmt =
1195 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1196 .unwrap()
1197 .pop()
1198 .unwrap();
1199
1200 let Statement::CreateFlow(create_flow) = stmt else {
1201 unreachable!()
1202 };
1203 let res = to_create_flow_task_expr(create_flow, &QueryContext::arc());
1204
1205 assert!(res.is_err());
1206 assert!(
1207 res.unwrap_err()
1208 .to_string()
1209 .contains("Invalid flow name: abc.`task_2`")
1210 );
1211 }
1212
1213 #[test]
1214 fn test_create_to_expr() {
1215 let sql = "CREATE TABLE monitor (host STRING,ts TIMESTAMP,TIME INDEX (ts),PRIMARY KEY(host)) ENGINE=mito WITH(ttl='3days', write_buffer_size='1024KB');";
1216 let stmt =
1217 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1218 .unwrap()
1219 .pop()
1220 .unwrap();
1221
1222 let Statement::CreateTable(create_table) = stmt else {
1223 unreachable!()
1224 };
1225 let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap();
1226 assert_eq!("3days", expr.table_options.get("ttl").unwrap());
1227 assert_eq!(
1228 "1.0MiB",
1229 expr.table_options.get("write_buffer_size").unwrap()
1230 );
1231 }
1232
1233 #[test]
1234 fn test_invalid_create_to_expr() {
1235 let cases = [
1236 "CREATE TABLE monitor (host STRING primary key, ts TIMESTAMP TIME INDEX, some_column text, some_column string);",
1238 "CREATE TABLE monitor (host STRING, ts TIMESTAMP TIME INDEX, some_column STRING, PRIMARY KEY (some_column, host, some_column));",
1240 "CREATE TABLE monitor (host STRING, ts TIMESTAMP TIME INDEX, PRIMARY KEY (host, ts));",
1242 ];
1243
1244 for sql in cases {
1245 let stmt = ParserContext::create_with_dialect(
1246 sql,
1247 &GreptimeDbDialect {},
1248 ParseOptions::default(),
1249 )
1250 .unwrap()
1251 .pop()
1252 .unwrap();
1253 let Statement::CreateTable(create_table) = stmt else {
1254 unreachable!()
1255 };
1256 create_to_expr(&create_table, &QueryContext::arc()).unwrap_err();
1257 }
1258 }
1259
1260 #[test]
1261 fn test_create_to_expr_with_default_timestamp_value() {
1262 let sql = "CREATE TABLE monitor (v double,ts TIMESTAMP default '2024-01-30T00:01:01',TIME INDEX (ts)) engine=mito;";
1263 let stmt =
1264 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1265 .unwrap()
1266 .pop()
1267 .unwrap();
1268
1269 let Statement::CreateTable(create_table) = stmt else {
1270 unreachable!()
1271 };
1272
1273 let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap();
1275 let ts_column = &expr.column_defs[1];
1276 let constraint = assert_ts_column(ts_column);
1277 assert!(
1278 matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1279 if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
1280 );
1281
1282 let ctx = QueryContextBuilder::default()
1284 .timezone(Timezone::from_tz_string("+08:00").unwrap())
1285 .build()
1286 .into();
1287 let expr = create_to_expr(&create_table, &ctx).unwrap();
1288 let ts_column = &expr.column_defs[1];
1289 let constraint = assert_ts_column(ts_column);
1290 assert!(
1291 matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1292 if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
1293 );
1294 }
1295
1296 fn assert_ts_column(ts_column: &api::v1::ColumnDef) -> ColumnDefaultConstraint {
1297 assert_eq!("ts", ts_column.name);
1298 assert_eq!(
1299 ColumnDataType::TimestampMillisecond as i32,
1300 ts_column.data_type
1301 );
1302 assert!(!ts_column.default_constraint.is_empty());
1303
1304 ColumnDefaultConstraint::try_from(&ts_column.default_constraint[..]).unwrap()
1305 }
1306
1307 #[test]
1308 fn test_to_alter_expr() {
1309 let sql = "ALTER DATABASE greptime SET key1='value1', key2='value2';";
1310 let stmt =
1311 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1312 .unwrap()
1313 .pop()
1314 .unwrap();
1315
1316 let Statement::AlterDatabase(alter_database) = stmt else {
1317 unreachable!()
1318 };
1319
1320 let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
1321 let kind = expr.kind.unwrap();
1322
1323 let AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
1324 set_database_options,
1325 }) = kind
1326 else {
1327 unreachable!()
1328 };
1329
1330 assert_eq!(2, set_database_options.len());
1331 assert_eq!("key1", set_database_options[0].key);
1332 assert_eq!("value1", set_database_options[0].value);
1333 assert_eq!("key2", set_database_options[1].key);
1334 assert_eq!("value2", set_database_options[1].value);
1335
1336 let sql = "ALTER DATABASE greptime UNSET key1, key2;";
1337 let stmt =
1338 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1339 .unwrap()
1340 .pop()
1341 .unwrap();
1342
1343 let Statement::AlterDatabase(alter_database) = stmt else {
1344 unreachable!()
1345 };
1346
1347 let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
1348 let kind = expr.kind.unwrap();
1349
1350 let AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys }) = kind else {
1351 unreachable!()
1352 };
1353
1354 assert_eq!(2, keys.len());
1355 assert!(keys.contains(&"key1".to_string()));
1356 assert!(keys.contains(&"key2".to_string()));
1357
1358 let sql = "ALTER TABLE monitor add column ts TIMESTAMP default '2024-01-30T00:01:01';";
1359 let stmt =
1360 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1361 .unwrap()
1362 .pop()
1363 .unwrap();
1364
1365 let Statement::AlterTable(alter_table) = stmt else {
1366 unreachable!()
1367 };
1368
1369 let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
1371 let kind = expr.kind.unwrap();
1372
1373 let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
1374 unreachable!()
1375 };
1376
1377 assert_eq!(1, add_columns.len());
1378 let ts_column = add_columns[0].column_def.clone().unwrap();
1379 let constraint = assert_ts_column(&ts_column);
1380 assert!(
1381 matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1382 if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
1383 );
1384
1385 let ctx = QueryContextBuilder::default()
1388 .timezone(Timezone::from_tz_string("+08:00").unwrap())
1389 .build()
1390 .into();
1391 let expr = to_alter_table_expr(alter_table, &ctx).unwrap();
1392 let kind = expr.kind.unwrap();
1393
1394 let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
1395 unreachable!()
1396 };
1397
1398 assert_eq!(1, add_columns.len());
1399 let ts_column = add_columns[0].column_def.clone().unwrap();
1400 let constraint = assert_ts_column(&ts_column);
1401 assert!(
1402 matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1403 if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
1404 );
1405 }
1406
1407 #[test]
1408 fn test_to_alter_modify_column_type_expr() {
1409 let sql = "ALTER TABLE monitor MODIFY COLUMN mem_usage STRING;";
1410 let stmt =
1411 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1412 .unwrap()
1413 .pop()
1414 .unwrap();
1415
1416 let Statement::AlterTable(alter_table) = stmt else {
1417 unreachable!()
1418 };
1419
1420 let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
1422 let kind = expr.kind.unwrap();
1423
1424 let AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
1425 modify_column_types,
1426 }) = kind
1427 else {
1428 unreachable!()
1429 };
1430
1431 assert_eq!(1, modify_column_types.len());
1432 let modify_column_type = &modify_column_types[0];
1433
1434 assert_eq!("mem_usage", modify_column_type.column_name);
1435 assert_eq!(
1436 ColumnDataType::String as i32,
1437 modify_column_type.target_type
1438 );
1439 assert!(modify_column_type.target_type_extension.is_none());
1440 }
1441
1442 #[test]
1443 fn test_to_repartition_request() {
1444 let sql = r#"
1445ALTER TABLE metrics REPARTITION (
1446 device_id < 100
1447) INTO (
1448 device_id < 100 AND area < 'South',
1449 device_id < 100 AND area >= 'South'
1450);"#;
1451 let stmt =
1452 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1453 .unwrap()
1454 .pop()
1455 .unwrap();
1456
1457 let Statement::AlterTable(alter_table) = stmt else {
1458 unreachable!()
1459 };
1460
1461 let request = to_repartition_request(alter_table, &QueryContext::arc()).unwrap();
1462 assert_eq!("greptime", request.catalog_name);
1463 assert_eq!("public", request.schema_name);
1464 assert_eq!("metrics", request.table_name);
1465 assert_eq!(
1466 request
1467 .from_exprs
1468 .into_iter()
1469 .map(|x| x.to_string())
1470 .collect::<Vec<_>>(),
1471 vec!["device_id < 100".to_string()]
1472 );
1473 assert_eq!(
1474 request
1475 .into_exprs
1476 .into_iter()
1477 .map(|x| x.to_string())
1478 .collect::<Vec<_>>(),
1479 vec![
1480 "device_id < 100 AND area < 'South'".to_string(),
1481 "device_id < 100 AND area >= 'South'".to_string()
1482 ]
1483 );
1484 }
1485
1486 fn new_test_table_names() -> Vec<TableName> {
1487 vec![
1488 TableName {
1489 catalog_name: "greptime".to_string(),
1490 schema_name: "public".to_string(),
1491 table_name: "a_table".to_string(),
1492 },
1493 TableName {
1494 catalog_name: "greptime".to_string(),
1495 schema_name: "public".to_string(),
1496 table_name: "b_table".to_string(),
1497 },
1498 ]
1499 }
1500
1501 #[test]
1502 fn test_to_create_view_expr() {
1503 let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
1504 let stmt =
1505 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1506 .unwrap()
1507 .pop()
1508 .unwrap();
1509
1510 let Statement::CreateView(stmt) = stmt else {
1511 unreachable!()
1512 };
1513
1514 let logical_plan = vec![1, 2, 3];
1515 let table_names = new_test_table_names();
1516 let columns = vec!["a".to_string()];
1517 let plan_columns = vec!["number".to_string()];
1518
1519 let expr = to_create_view_expr(
1520 stmt,
1521 logical_plan.clone(),
1522 table_names.clone(),
1523 columns.clone(),
1524 plan_columns.clone(),
1525 sql.to_string(),
1526 QueryContext::arc(),
1527 )
1528 .unwrap();
1529
1530 assert_eq!("greptime", expr.catalog_name);
1531 assert_eq!("public", expr.schema_name);
1532 assert_eq!("test", expr.view_name);
1533 assert!(!expr.create_if_not_exists);
1534 assert!(!expr.or_replace);
1535 assert_eq!(logical_plan, expr.logical_plan);
1536 assert_eq!(table_names, expr.table_names);
1537 assert_eq!(sql, expr.definition);
1538 assert_eq!(columns, expr.columns);
1539 assert_eq!(plan_columns, expr.plan_columns);
1540 }
1541
1542 #[test]
1543 fn test_to_create_view_expr_complex() {
1544 let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test.test_view AS SELECT * FROM NUMBERS";
1545 let stmt =
1546 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1547 .unwrap()
1548 .pop()
1549 .unwrap();
1550
1551 let Statement::CreateView(stmt) = stmt else {
1552 unreachable!()
1553 };
1554
1555 let logical_plan = vec![1, 2, 3];
1556 let table_names = new_test_table_names();
1557 let columns = vec!["a".to_string()];
1558 let plan_columns = vec!["number".to_string()];
1559
1560 let expr = to_create_view_expr(
1561 stmt,
1562 logical_plan.clone(),
1563 table_names.clone(),
1564 columns.clone(),
1565 plan_columns.clone(),
1566 sql.to_string(),
1567 QueryContext::arc(),
1568 )
1569 .unwrap();
1570
1571 assert_eq!("greptime", expr.catalog_name);
1572 assert_eq!("test", expr.schema_name);
1573 assert_eq!("test_view", expr.view_name);
1574 assert!(expr.create_if_not_exists);
1575 assert!(expr.or_replace);
1576 assert_eq!(logical_plan, expr.logical_plan);
1577 assert_eq!(table_names, expr.table_names);
1578 assert_eq!(sql, expr.definition);
1579 assert_eq!(columns, expr.columns);
1580 assert_eq!(plan_columns, expr.plan_columns);
1581 }
1582
1583 #[test]
1584 fn test_expr_to_create() {
1585 let sql = r#"CREATE TABLE IF NOT EXISTS `tt` (
1586 `timestamp` TIMESTAMP(9) NOT NULL,
1587 `ip_address` STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),
1588 `username` STRING NULL,
1589 `http_method` STRING NULL INVERTED INDEX,
1590 `request_line` STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', backend = 'bloom', case_sensitive = 'false', false_positive_rate = '0.01', granularity = '10240'),
1591 `protocol` STRING NULL,
1592 `status_code` INT NULL INVERTED INDEX,
1593 `response_size` BIGINT NULL,
1594 `message` STRING NULL,
1595 TIME INDEX (`timestamp`),
1596 PRIMARY KEY (`username`, `status_code`)
1597)
1598ENGINE=mito
1599WITH(
1600 append_mode = 'true'
1601)"#;
1602 let stmt =
1603 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1604 .unwrap()
1605 .pop()
1606 .unwrap();
1607
1608 let Statement::CreateTable(original_create) = stmt else {
1609 unreachable!()
1610 };
1611
1612 let expr = create_to_expr(&original_create, &QueryContext::arc()).unwrap();
1614
1615 let create_table = expr_to_create(&expr, Some('`')).unwrap();
1616 let new_sql = format!("{:#}", create_table);
1617 assert_eq!(sql, new_sql);
1618 }
1619}