1mod show_create_table;
16
17use std::collections::HashMap;
18use std::ops::ControlFlow;
19use std::sync::Arc;
20
21use catalog::CatalogManagerRef;
22use catalog::information_schema::{
23 CHARACTER_SETS, COLLATIONS, COLUMNS, FLOWS, KEY_COLUMN_USAGE, REGION_PEERS, SCHEMATA, TABLES,
24 VIEWS, columns, flows, key_column_usage, process_list, region_peers, schemata, tables,
25};
26use common_catalog::consts::{
27 INFORMATION_SCHEMA_NAME, SEMANTIC_TYPE_FIELD, SEMANTIC_TYPE_PRIMARY_KEY,
28 SEMANTIC_TYPE_TIME_INDEX,
29};
30use common_catalog::format_full_table_name;
31use common_datasource::file_format::{FileFormat, Format, infer_schemas};
32use common_datasource::lister::{Lister, Source};
33use common_datasource::object_store::build_backend;
34use common_datasource::util::find_dir_and_filename;
35use common_meta::SchemaOptions;
36use common_meta::key::flow::flow_info::FlowInfoValue;
37use common_query::Output;
38use common_query::prelude::greptime_timestamp;
39use common_recordbatch::RecordBatches;
40use common_recordbatch::adapter::RecordBatchStreamAdapter;
41use common_time::Timestamp;
42use common_time::timezone::get_timezone;
43use datafusion::common::ScalarValue;
44use datafusion::prelude::SessionContext;
45use datafusion_expr::{Expr, SortExpr, case, col, lit};
46use datatypes::prelude::*;
47use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, RawSchema, Schema};
48use datatypes::vectors::StringVector;
49use itertools::Itertools;
50use object_store::ObjectStore;
51use once_cell::sync::Lazy;
52use regex::Regex;
53use session::context::{Channel, QueryContextRef};
54pub use show_create_table::create_table_stmt;
55use snafu::{OptionExt, ResultExt, ensure};
56use sql::ast::{Ident, visit_expressions_mut};
57use sql::parser::ParserContext;
58use sql::statements::OptionMap;
59use sql::statements::create::{CreateDatabase, CreateFlow, CreateView, Partitions, SqlOrTql};
60use sql::statements::show::{
61 ShowColumns, ShowDatabases, ShowFlows, ShowIndex, ShowKind, ShowProcessList, ShowRegion,
62 ShowTableStatus, ShowTables, ShowVariables, ShowViews,
63};
64use sql::statements::statement::Statement;
65use sqlparser::ast::ObjectName;
66use store_api::metric_engine_consts::{is_metric_engine, is_metric_engine_internal_column};
67use table::TableRef;
68use table::metadata::TableInfoRef;
69use table::requests::{FILE_TABLE_LOCATION_KEY, FILE_TABLE_PATTERN_KEY};
70
71use crate::QueryEngineRef;
72use crate::error::{self, Result, UnsupportedVariableSnafu};
73use crate::planner::DfLogicalPlanner;
74
75const SCHEMAS_COLUMN: &str = "Database";
76const OPTIONS_COLUMN: &str = "Options";
77const VIEWS_COLUMN: &str = "Views";
78const FLOWS_COLUMN: &str = "Flows";
79const FIELD_COLUMN: &str = "Field";
80const TABLE_TYPE_COLUMN: &str = "Table_type";
81const COLUMN_NAME_COLUMN: &str = "Column";
82const COLUMN_GREPTIME_TYPE_COLUMN: &str = "Greptime_type";
83const COLUMN_TYPE_COLUMN: &str = "Type";
84const COLUMN_KEY_COLUMN: &str = "Key";
85const COLUMN_EXTRA_COLUMN: &str = "Extra";
86const COLUMN_PRIVILEGES_COLUMN: &str = "Privileges";
87const COLUMN_COLLATION_COLUMN: &str = "Collation";
88const COLUMN_NULLABLE_COLUMN: &str = "Null";
89const COLUMN_DEFAULT_COLUMN: &str = "Default";
90const COLUMN_COMMENT_COLUMN: &str = "Comment";
91const COLUMN_SEMANTIC_TYPE_COLUMN: &str = "Semantic Type";
92
93const YES_STR: &str = "YES";
94const NO_STR: &str = "NO";
95const PRI_KEY: &str = "PRI";
96const TIME_INDEX: &str = "TIME INDEX";
97
98const INDEX_TABLE_COLUMN: &str = "Table";
100const INDEX_NONT_UNIQUE_COLUMN: &str = "Non_unique";
101const INDEX_CARDINALITY_COLUMN: &str = "Cardinality";
102const INDEX_SUB_PART_COLUMN: &str = "Sub_part";
103const INDEX_PACKED_COLUMN: &str = "Packed";
104const INDEX_INDEX_TYPE_COLUMN: &str = "Index_type";
105const INDEX_COMMENT_COLUMN: &str = "Index_comment";
106const INDEX_VISIBLE_COLUMN: &str = "Visible";
107const INDEX_EXPRESSION_COLUMN: &str = "Expression";
108const INDEX_KEY_NAME_COLUMN: &str = "Key_name";
109const INDEX_SEQ_IN_INDEX_COLUMN: &str = "Seq_in_index";
110const INDEX_COLUMN_NAME_COLUMN: &str = "Column_name";
111
112static DESCRIBE_TABLE_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
113 Arc::new(Schema::new(vec![
114 ColumnSchema::new(
115 COLUMN_NAME_COLUMN,
116 ConcreteDataType::string_datatype(),
117 false,
118 ),
119 ColumnSchema::new(
120 COLUMN_TYPE_COLUMN,
121 ConcreteDataType::string_datatype(),
122 false,
123 ),
124 ColumnSchema::new(COLUMN_KEY_COLUMN, ConcreteDataType::string_datatype(), true),
125 ColumnSchema::new(
126 COLUMN_NULLABLE_COLUMN,
127 ConcreteDataType::string_datatype(),
128 false,
129 ),
130 ColumnSchema::new(
131 COLUMN_DEFAULT_COLUMN,
132 ConcreteDataType::string_datatype(),
133 false,
134 ),
135 ColumnSchema::new(
136 COLUMN_SEMANTIC_TYPE_COLUMN,
137 ConcreteDataType::string_datatype(),
138 false,
139 ),
140 ]))
141});
142
143static SHOW_CREATE_DATABASE_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
144 Arc::new(Schema::new(vec![
145 ColumnSchema::new("Database", ConcreteDataType::string_datatype(), false),
146 ColumnSchema::new(
147 "Create Database",
148 ConcreteDataType::string_datatype(),
149 false,
150 ),
151 ]))
152});
153
154static SHOW_CREATE_TABLE_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
155 Arc::new(Schema::new(vec![
156 ColumnSchema::new("Table", ConcreteDataType::string_datatype(), false),
157 ColumnSchema::new("Create Table", ConcreteDataType::string_datatype(), false),
158 ]))
159});
160
161static SHOW_CREATE_FLOW_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
162 Arc::new(Schema::new(vec![
163 ColumnSchema::new("Flow", ConcreteDataType::string_datatype(), false),
164 ColumnSchema::new("Create Flow", ConcreteDataType::string_datatype(), false),
165 ]))
166});
167
168static SHOW_CREATE_VIEW_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
169 Arc::new(Schema::new(vec![
170 ColumnSchema::new("View", ConcreteDataType::string_datatype(), false),
171 ColumnSchema::new("Create View", ConcreteDataType::string_datatype(), false),
172 ]))
173});
174
175fn null() -> Expr {
176 lit(ScalarValue::Null)
177}
178
179pub async fn show_databases(
180 stmt: ShowDatabases,
181 query_engine: &QueryEngineRef,
182 catalog_manager: &CatalogManagerRef,
183 query_ctx: QueryContextRef,
184) -> Result<Output> {
185 let projects = if stmt.full {
186 vec![
187 (schemata::SCHEMA_NAME, SCHEMAS_COLUMN),
188 (schemata::SCHEMA_OPTS, OPTIONS_COLUMN),
189 ]
190 } else {
191 vec![(schemata::SCHEMA_NAME, SCHEMAS_COLUMN)]
192 };
193
194 let filters = vec![col(schemata::CATALOG_NAME).eq(lit(query_ctx.current_catalog()))];
195 let like_field = Some(schemata::SCHEMA_NAME);
196 let sort = vec![col(schemata::SCHEMA_NAME).sort(true, true)];
197
198 query_from_information_schema_table(
199 query_engine,
200 catalog_manager,
201 query_ctx,
202 SCHEMATA,
203 vec![],
204 projects,
205 filters,
206 like_field,
207 sort,
208 stmt.kind,
209 )
210 .await
211}
212
213fn replace_column_in_expr(expr: &mut sqlparser::ast::Expr, from_column: &str, to_column: &str) {
216 let _ = visit_expressions_mut(expr, |e| {
217 match e {
218 sqlparser::ast::Expr::Identifier(ident) => {
219 if ident.value.eq_ignore_ascii_case(from_column) {
220 ident.value = to_column.to_string();
221 }
222 }
223 sqlparser::ast::Expr::CompoundIdentifier(idents) => {
224 if let Some(last) = idents.last_mut()
225 && last.value.eq_ignore_ascii_case(from_column)
226 {
227 last.value = to_column.to_string();
228 }
229 }
230 _ => {}
231 }
232 ControlFlow::<()>::Continue(())
233 });
234}
235
236#[allow(clippy::too_many_arguments)]
244async fn query_from_information_schema_table(
245 query_engine: &QueryEngineRef,
246 catalog_manager: &CatalogManagerRef,
247 query_ctx: QueryContextRef,
248 table_name: &str,
249 select: Vec<Expr>,
250 projects: Vec<(&str, &str)>,
251 filters: Vec<Expr>,
252 like_field: Option<&str>,
253 sort: Vec<SortExpr>,
254 kind: ShowKind,
255) -> Result<Output> {
256 let table = catalog_manager
257 .table(
258 query_ctx.current_catalog(),
259 INFORMATION_SCHEMA_NAME,
260 table_name,
261 Some(&query_ctx),
262 )
263 .await
264 .context(error::CatalogSnafu)?
265 .with_context(|| error::TableNotFoundSnafu {
266 table: format_full_table_name(
267 query_ctx.current_catalog(),
268 INFORMATION_SCHEMA_NAME,
269 table_name,
270 ),
271 })?;
272
273 let dataframe = query_engine.read_table(table)?;
274
275 let dataframe = filters.into_iter().try_fold(dataframe, |df, expr| {
277 df.filter(expr).context(error::PlanSqlSnafu)
278 })?;
279
280 let dataframe = if let (ShowKind::Like(ident), Some(field)) = (&kind, like_field) {
282 dataframe
283 .filter(col(field).like(lit(ident.value.clone())))
284 .context(error::PlanSqlSnafu)?
285 } else {
286 dataframe
287 };
288
289 let dataframe = if sort.is_empty() {
291 dataframe
292 } else {
293 dataframe.sort(sort).context(error::PlanSqlSnafu)?
294 };
295
296 let dataframe = if select.is_empty() {
298 if projects.is_empty() {
299 dataframe
300 } else {
301 let projection = projects
302 .iter()
303 .map(|x| col(x.0).alias(x.1))
304 .collect::<Vec<_>>();
305 dataframe.select(projection).context(error::PlanSqlSnafu)?
306 }
307 } else {
308 dataframe.select(select).context(error::PlanSqlSnafu)?
309 };
310
311 let dataframe = projects
313 .into_iter()
314 .try_fold(dataframe, |df, (column, renamed_column)| {
315 df.with_column_renamed(column, renamed_column)
316 .context(error::PlanSqlSnafu)
317 })?;
318
319 let dataframe = match kind {
320 ShowKind::All | ShowKind::Like(_) => {
321 dataframe
323 }
324 ShowKind::Where(filter) => {
325 let view = dataframe.into_view();
328 let dataframe = SessionContext::new_with_state(
329 query_engine
330 .engine_context(query_ctx.clone())
331 .state()
332 .clone(),
333 )
334 .read_table(view)?;
335
336 let planner = query_engine.planner();
337 let planner = planner
338 .as_any()
339 .downcast_ref::<DfLogicalPlanner>()
340 .expect("Must be the datafusion planner");
341
342 let filter = planner
343 .sql_to_expr(filter, dataframe.schema(), false, query_ctx)
344 .await?;
345
346 dataframe.filter(filter).context(error::PlanSqlSnafu)?
348 }
349 };
350
351 let stream = dataframe.execute_stream().await?;
352
353 Ok(Output::new_with_stream(Box::pin(
354 RecordBatchStreamAdapter::try_new(stream).context(error::CreateRecordBatchSnafu)?,
355 )))
356}
357
358pub async fn show_columns(
360 stmt: ShowColumns,
361 query_engine: &QueryEngineRef,
362 catalog_manager: &CatalogManagerRef,
363 query_ctx: QueryContextRef,
364) -> Result<Output> {
365 let schema_name = if let Some(database) = stmt.database {
366 database
367 } else {
368 query_ctx.current_schema()
369 };
370
371 let projects = if stmt.full {
372 vec![
373 (columns::COLUMN_NAME, FIELD_COLUMN),
374 (columns::DATA_TYPE, COLUMN_TYPE_COLUMN),
375 (columns::COLLATION_NAME, COLUMN_COLLATION_COLUMN),
376 (columns::IS_NULLABLE, COLUMN_NULLABLE_COLUMN),
377 (columns::COLUMN_KEY, COLUMN_KEY_COLUMN),
378 (columns::COLUMN_DEFAULT, COLUMN_DEFAULT_COLUMN),
379 (columns::COLUMN_COMMENT, COLUMN_COMMENT_COLUMN),
380 (columns::PRIVILEGES, COLUMN_PRIVILEGES_COLUMN),
381 (columns::EXTRA, COLUMN_EXTRA_COLUMN),
382 (columns::GREPTIME_DATA_TYPE, COLUMN_GREPTIME_TYPE_COLUMN),
383 ]
384 } else {
385 vec![
386 (columns::COLUMN_NAME, FIELD_COLUMN),
387 (columns::DATA_TYPE, COLUMN_TYPE_COLUMN),
388 (columns::IS_NULLABLE, COLUMN_NULLABLE_COLUMN),
389 (columns::COLUMN_KEY, COLUMN_KEY_COLUMN),
390 (columns::COLUMN_DEFAULT, COLUMN_DEFAULT_COLUMN),
391 (columns::EXTRA, COLUMN_EXTRA_COLUMN),
392 (columns::GREPTIME_DATA_TYPE, COLUMN_GREPTIME_TYPE_COLUMN),
393 ]
394 };
395
396 let filters = vec![
397 col(columns::TABLE_NAME).eq(lit(&stmt.table)),
398 col(columns::TABLE_SCHEMA).eq(lit(schema_name.clone())),
399 col(columns::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
400 ];
401 let like_field = Some(columns::COLUMN_NAME);
402 let sort = vec![col(columns::COLUMN_NAME).sort(true, true)];
403
404 query_from_information_schema_table(
405 query_engine,
406 catalog_manager,
407 query_ctx,
408 COLUMNS,
409 vec![],
410 projects,
411 filters,
412 like_field,
413 sort,
414 stmt.kind,
415 )
416 .await
417}
418
419pub async fn show_index(
421 stmt: ShowIndex,
422 query_engine: &QueryEngineRef,
423 catalog_manager: &CatalogManagerRef,
424 query_ctx: QueryContextRef,
425) -> Result<Output> {
426 let schema_name = if let Some(database) = stmt.database {
427 database
428 } else {
429 query_ctx.current_schema()
430 };
431
432 let select = vec![
433 col(key_column_usage::TABLE_NAME).alias(INDEX_TABLE_COLUMN),
434 lit(1).alias(INDEX_NONT_UNIQUE_COLUMN),
436 col(key_column_usage::CONSTRAINT_NAME).alias(INDEX_KEY_NAME_COLUMN),
437 col(key_column_usage::ORDINAL_POSITION).alias(INDEX_SEQ_IN_INDEX_COLUMN),
438 col(key_column_usage::COLUMN_NAME).alias(INDEX_COLUMN_NAME_COLUMN),
439 lit("A").alias(COLUMN_COLLATION_COLUMN),
441 null().alias(INDEX_CARDINALITY_COLUMN),
442 null().alias(INDEX_SUB_PART_COLUMN),
443 null().alias(INDEX_PACKED_COLUMN),
444 case(col(key_column_usage::CONSTRAINT_NAME))
449 .when(lit(TIME_INDEX), lit(NO_STR))
450 .otherwise(lit(YES_STR))
451 .context(error::PlanSqlSnafu)?
452 .alias(COLUMN_NULLABLE_COLUMN),
453 col(key_column_usage::GREPTIME_INDEX_TYPE).alias(INDEX_INDEX_TYPE_COLUMN),
454 lit("").alias(COLUMN_COMMENT_COLUMN),
455 lit("").alias(INDEX_COMMENT_COLUMN),
456 lit(YES_STR).alias(INDEX_VISIBLE_COLUMN),
457 null().alias(INDEX_EXPRESSION_COLUMN),
458 ];
459
460 let projects = vec![
461 (key_column_usage::TABLE_NAME, INDEX_TABLE_COLUMN),
462 (INDEX_NONT_UNIQUE_COLUMN, INDEX_NONT_UNIQUE_COLUMN),
463 (key_column_usage::CONSTRAINT_NAME, INDEX_KEY_NAME_COLUMN),
464 (
465 key_column_usage::ORDINAL_POSITION,
466 INDEX_SEQ_IN_INDEX_COLUMN,
467 ),
468 (key_column_usage::COLUMN_NAME, INDEX_COLUMN_NAME_COLUMN),
469 (COLUMN_COLLATION_COLUMN, COLUMN_COLLATION_COLUMN),
470 (INDEX_CARDINALITY_COLUMN, INDEX_CARDINALITY_COLUMN),
471 (INDEX_SUB_PART_COLUMN, INDEX_SUB_PART_COLUMN),
472 (INDEX_PACKED_COLUMN, INDEX_PACKED_COLUMN),
473 (COLUMN_NULLABLE_COLUMN, COLUMN_NULLABLE_COLUMN),
474 (
475 key_column_usage::GREPTIME_INDEX_TYPE,
476 INDEX_INDEX_TYPE_COLUMN,
477 ),
478 (COLUMN_COMMENT_COLUMN, COLUMN_COMMENT_COLUMN),
479 (INDEX_COMMENT_COLUMN, INDEX_COMMENT_COLUMN),
480 (INDEX_VISIBLE_COLUMN, INDEX_VISIBLE_COLUMN),
481 (INDEX_EXPRESSION_COLUMN, INDEX_EXPRESSION_COLUMN),
482 ];
483
484 let filters = vec![
485 col(key_column_usage::TABLE_NAME).eq(lit(&stmt.table)),
486 col(key_column_usage::TABLE_SCHEMA).eq(lit(schema_name.clone())),
487 col(key_column_usage::REAL_TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
488 ];
489 let like_field = None;
490 let sort = vec![col(columns::COLUMN_NAME).sort(true, true)];
491
492 query_from_information_schema_table(
493 query_engine,
494 catalog_manager,
495 query_ctx,
496 KEY_COLUMN_USAGE,
497 select,
498 projects,
499 filters,
500 like_field,
501 sort,
502 stmt.kind,
503 )
504 .await
505}
506
507pub async fn show_region(
509 stmt: ShowRegion,
510 query_engine: &QueryEngineRef,
511 catalog_manager: &CatalogManagerRef,
512 query_ctx: QueryContextRef,
513) -> Result<Output> {
514 let schema_name = if let Some(database) = stmt.database {
515 database
516 } else {
517 query_ctx.current_schema()
518 };
519
520 let filters = vec![
521 col(region_peers::TABLE_NAME).eq(lit(&stmt.table)),
522 col(region_peers::TABLE_SCHEMA).eq(lit(schema_name.clone())),
523 col(region_peers::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
524 ];
525 let projects = vec![
526 (region_peers::TABLE_NAME, "Table"),
527 (region_peers::REGION_ID, "Region"),
528 (region_peers::PEER_ID, "Peer"),
529 (region_peers::IS_LEADER, "Leader"),
530 ];
531
532 let like_field = None;
533 let sort = vec![
534 col(columns::REGION_ID).sort(true, true),
535 col(columns::PEER_ID).sort(true, true),
536 ];
537
538 query_from_information_schema_table(
539 query_engine,
540 catalog_manager,
541 query_ctx,
542 REGION_PEERS,
543 vec![],
544 projects,
545 filters,
546 like_field,
547 sort,
548 stmt.kind,
549 )
550 .await
551}
552
553pub async fn show_tables(
555 stmt: ShowTables,
556 query_engine: &QueryEngineRef,
557 catalog_manager: &CatalogManagerRef,
558 query_ctx: QueryContextRef,
559) -> Result<Output> {
560 let schema_name = if let Some(database) = stmt.database {
561 database
562 } else {
563 query_ctx.current_schema()
564 };
565
566 let tables_column = format!("Tables_in_{}", schema_name);
568 let projects = if stmt.full {
569 vec![
570 (tables::TABLE_NAME, tables_column.as_str()),
571 (tables::TABLE_TYPE, TABLE_TYPE_COLUMN),
572 ]
573 } else {
574 vec![(tables::TABLE_NAME, tables_column.as_str())]
575 };
576 let filters = vec![
577 col(tables::TABLE_SCHEMA).eq(lit(schema_name.clone())),
578 col(tables::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
579 ];
580 let like_field = Some(tables::TABLE_NAME);
581 let sort = vec![col(tables::TABLE_NAME).sort(true, true)];
582
583 let kind = match stmt.kind {
586 ShowKind::Where(mut filter) => {
587 replace_column_in_expr(&mut filter, "Tables", &tables_column);
588 ShowKind::Where(filter)
589 }
590 other => other,
591 };
592
593 query_from_information_schema_table(
594 query_engine,
595 catalog_manager,
596 query_ctx,
597 TABLES,
598 vec![],
599 projects,
600 filters,
601 like_field,
602 sort,
603 kind,
604 )
605 .await
606}
607
608pub async fn show_table_status(
610 stmt: ShowTableStatus,
611 query_engine: &QueryEngineRef,
612 catalog_manager: &CatalogManagerRef,
613 query_ctx: QueryContextRef,
614) -> Result<Output> {
615 let schema_name = if let Some(database) = stmt.database {
616 database
617 } else {
618 query_ctx.current_schema()
619 };
620
621 let projects = vec![
623 (tables::TABLE_NAME, "Name"),
624 (tables::ENGINE, "Engine"),
625 (tables::VERSION, "Version"),
626 (tables::ROW_FORMAT, "Row_format"),
627 (tables::TABLE_ROWS, "Rows"),
628 (tables::AVG_ROW_LENGTH, "Avg_row_length"),
629 (tables::DATA_LENGTH, "Data_length"),
630 (tables::MAX_DATA_LENGTH, "Max_data_length"),
631 (tables::INDEX_LENGTH, "Index_length"),
632 (tables::DATA_FREE, "Data_free"),
633 (tables::AUTO_INCREMENT, "Auto_increment"),
634 (tables::CREATE_TIME, "Create_time"),
635 (tables::UPDATE_TIME, "Update_time"),
636 (tables::CHECK_TIME, "Check_time"),
637 (tables::TABLE_COLLATION, "Collation"),
638 (tables::CHECKSUM, "Checksum"),
639 (tables::CREATE_OPTIONS, "Create_options"),
640 (tables::TABLE_COMMENT, "Comment"),
641 ];
642
643 let filters = vec![
644 col(tables::TABLE_SCHEMA).eq(lit(schema_name.clone())),
645 col(tables::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
646 ];
647 let like_field = Some(tables::TABLE_NAME);
648 let sort = vec![col(tables::TABLE_NAME).sort(true, true)];
649
650 query_from_information_schema_table(
651 query_engine,
652 catalog_manager,
653 query_ctx,
654 TABLES,
655 vec![],
656 projects,
657 filters,
658 like_field,
659 sort,
660 stmt.kind,
661 )
662 .await
663}
664
665pub async fn show_collations(
667 kind: ShowKind,
668 query_engine: &QueryEngineRef,
669 catalog_manager: &CatalogManagerRef,
670 query_ctx: QueryContextRef,
671) -> Result<Output> {
672 let projects = vec![
674 ("collation_name", "Collation"),
675 ("character_set_name", "Charset"),
676 ("id", "Id"),
677 ("is_default", "Default"),
678 ("is_compiled", "Compiled"),
679 ("sortlen", "Sortlen"),
680 ];
681
682 let filters = vec![];
683 let like_field = Some("collation_name");
684 let sort = vec![];
685
686 query_from_information_schema_table(
687 query_engine,
688 catalog_manager,
689 query_ctx,
690 COLLATIONS,
691 vec![],
692 projects,
693 filters,
694 like_field,
695 sort,
696 kind,
697 )
698 .await
699}
700
701pub async fn show_charsets(
703 kind: ShowKind,
704 query_engine: &QueryEngineRef,
705 catalog_manager: &CatalogManagerRef,
706 query_ctx: QueryContextRef,
707) -> Result<Output> {
708 let projects = vec![
710 ("character_set_name", "Charset"),
711 ("description", "Description"),
712 ("default_collate_name", "Default collation"),
713 ("maxlen", "Maxlen"),
714 ];
715
716 let filters = vec![];
717 let like_field = Some("character_set_name");
718 let sort = vec![];
719
720 query_from_information_schema_table(
721 query_engine,
722 catalog_manager,
723 query_ctx,
724 CHARACTER_SETS,
725 vec![],
726 projects,
727 filters,
728 like_field,
729 sort,
730 kind,
731 )
732 .await
733}
734
735pub fn show_variable(stmt: ShowVariables, query_ctx: QueryContextRef) -> Result<Output> {
736 let variable = stmt.variable.to_string().to_uppercase();
737 let value = match variable.as_str() {
738 "SYSTEM_TIME_ZONE" | "SYSTEM_TIMEZONE" => get_timezone(None).to_string(),
739 "TIME_ZONE" | "TIMEZONE" => query_ctx.timezone().to_string(),
740 "READ_PREFERENCE" => query_ctx.read_preference().to_string(),
741 "DATESTYLE" => {
742 let (style, order) = *query_ctx.configuration_parameter().pg_datetime_style();
743 format!("{}, {}", style, order)
744 }
745 "MAX_EXECUTION_TIME" => {
746 if query_ctx.channel() == Channel::Mysql {
747 query_ctx.query_timeout_as_millis().to_string()
748 } else {
749 return UnsupportedVariableSnafu { name: variable }.fail();
750 }
751 }
752 "STATEMENT_TIMEOUT" => {
753 if query_ctx.channel() == Channel::Postgres {
755 let mut timeout = query_ctx.query_timeout_as_millis().to_string();
756 timeout.push_str("ms");
757 timeout
758 } else {
759 return UnsupportedVariableSnafu { name: variable }.fail();
760 }
761 }
762 _ => return UnsupportedVariableSnafu { name: variable }.fail(),
763 };
764 let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
765 variable,
766 ConcreteDataType::string_datatype(),
767 false,
768 )]));
769 let records = RecordBatches::try_from_columns(
770 schema,
771 vec![Arc::new(StringVector::from(vec![value])) as _],
772 )
773 .context(error::CreateRecordBatchSnafu)?;
774 Ok(Output::new_with_record_batches(records))
775}
776
777pub async fn show_status(_query_ctx: QueryContextRef) -> Result<Output> {
778 let schema = Arc::new(Schema::new(vec![
779 ColumnSchema::new("Variable_name", ConcreteDataType::string_datatype(), false),
780 ColumnSchema::new("Value", ConcreteDataType::string_datatype(), true),
781 ]));
782 let records = RecordBatches::try_from_columns(
783 schema,
784 vec![
785 Arc::new(StringVector::from(Vec::<&str>::new())) as _,
786 Arc::new(StringVector::from(Vec::<&str>::new())) as _,
787 ],
788 )
789 .context(error::CreateRecordBatchSnafu)?;
790 Ok(Output::new_with_record_batches(records))
791}
792
793pub async fn show_search_path(_query_ctx: QueryContextRef) -> Result<Output> {
794 let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
795 "search_path",
796 ConcreteDataType::string_datatype(),
797 false,
798 )]));
799 let records = RecordBatches::try_from_columns(
800 schema,
801 vec![Arc::new(StringVector::from(vec![_query_ctx.current_schema()])) as _],
802 )
803 .context(error::CreateRecordBatchSnafu)?;
804 Ok(Output::new_with_record_batches(records))
805}
806
807pub fn show_create_database(database_name: &str, options: OptionMap) -> Result<Output> {
808 let stmt = CreateDatabase {
809 name: ObjectName::from(vec![Ident::new(database_name)]),
810 if_not_exists: true,
811 options,
812 };
813 let sql = format!("{stmt}");
814 let columns = vec![
815 Arc::new(StringVector::from(vec![database_name.to_string()])) as _,
816 Arc::new(StringVector::from(vec![sql])) as _,
817 ];
818 let records =
819 RecordBatches::try_from_columns(SHOW_CREATE_DATABASE_OUTPUT_SCHEMA.clone(), columns)
820 .context(error::CreateRecordBatchSnafu)?;
821 Ok(Output::new_with_record_batches(records))
822}
823
824pub fn show_create_table(
825 table_info: TableInfoRef,
826 schema_options: Option<SchemaOptions>,
827 partitions: Option<Partitions>,
828 query_ctx: QueryContextRef,
829) -> Result<Output> {
830 let table_name = table_info.name.clone();
831
832 let quote_style = query_ctx.quote_style();
833
834 let mut stmt = create_table_stmt(&table_info, schema_options, quote_style)?;
835 stmt.partitions = partitions.map(|mut p| {
836 p.set_quote(quote_style);
837 p
838 });
839 let sql = format!("{}", stmt);
840 let columns = vec![
841 Arc::new(StringVector::from(vec![table_name])) as _,
842 Arc::new(StringVector::from(vec![sql])) as _,
843 ];
844 let records = RecordBatches::try_from_columns(SHOW_CREATE_TABLE_OUTPUT_SCHEMA.clone(), columns)
845 .context(error::CreateRecordBatchSnafu)?;
846
847 Ok(Output::new_with_record_batches(records))
848}
849
850pub fn show_create_foreign_table_for_pg(
851 table: TableRef,
852 _query_ctx: QueryContextRef,
853) -> Result<Output> {
854 let table_info = table.table_info();
855
856 let table_meta = &table_info.meta;
857 let table_name = &table_info.name;
858 let schema = &table_info.meta.schema;
859 let is_metric_engine = is_metric_engine(&table_meta.engine);
860
861 let columns = schema
862 .column_schemas()
863 .iter()
864 .filter_map(|c| {
865 if is_metric_engine && is_metric_engine_internal_column(&c.name) {
866 None
867 } else {
868 Some(format!(
869 "\"{}\" {}",
870 c.name,
871 c.data_type.postgres_datatype_name()
872 ))
873 }
874 })
875 .join(",\n ");
876
877 let sql = format!(
878 r#"CREATE FOREIGN TABLE ft_{} (
879 {}
880)
881SERVER greptimedb
882OPTIONS (table_name '{}')"#,
883 table_name, columns, table_name
884 );
885
886 let columns = vec![
887 Arc::new(StringVector::from(vec![table_name.clone()])) as _,
888 Arc::new(StringVector::from(vec![sql])) as _,
889 ];
890 let records = RecordBatches::try_from_columns(SHOW_CREATE_TABLE_OUTPUT_SCHEMA.clone(), columns)
891 .context(error::CreateRecordBatchSnafu)?;
892
893 Ok(Output::new_with_record_batches(records))
894}
895
896pub fn show_create_view(
897 view_name: ObjectName,
898 definition: &str,
899 query_ctx: QueryContextRef,
900) -> Result<Output> {
901 let mut parser_ctx =
902 ParserContext::new(query_ctx.sql_dialect(), definition).context(error::SqlSnafu)?;
903
904 let Statement::CreateView(create_view) =
905 parser_ctx.parse_statement().context(error::SqlSnafu)?
906 else {
907 unreachable!();
909 };
910
911 let stmt = CreateView {
912 name: view_name.clone(),
913 columns: create_view.columns,
914 query: create_view.query,
915 or_replace: create_view.or_replace,
916 if_not_exists: create_view.if_not_exists,
917 };
918
919 let sql = format!("{}", stmt);
920 let columns = vec![
921 Arc::new(StringVector::from(vec![view_name.to_string()])) as _,
922 Arc::new(StringVector::from(vec![sql])) as _,
923 ];
924 let records = RecordBatches::try_from_columns(SHOW_CREATE_VIEW_OUTPUT_SCHEMA.clone(), columns)
925 .context(error::CreateRecordBatchSnafu)?;
926
927 Ok(Output::new_with_record_batches(records))
928}
929
930pub async fn show_views(
932 stmt: ShowViews,
933 query_engine: &QueryEngineRef,
934 catalog_manager: &CatalogManagerRef,
935 query_ctx: QueryContextRef,
936) -> Result<Output> {
937 let schema_name = if let Some(database) = stmt.database {
938 database
939 } else {
940 query_ctx.current_schema()
941 };
942
943 let projects = vec![(tables::TABLE_NAME, VIEWS_COLUMN)];
944 let filters = vec![
945 col(tables::TABLE_SCHEMA).eq(lit(schema_name.clone())),
946 col(tables::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
947 ];
948 let like_field = Some(tables::TABLE_NAME);
949 let sort = vec![col(tables::TABLE_NAME).sort(true, true)];
950
951 query_from_information_schema_table(
952 query_engine,
953 catalog_manager,
954 query_ctx,
955 VIEWS,
956 vec![],
957 projects,
958 filters,
959 like_field,
960 sort,
961 stmt.kind,
962 )
963 .await
964}
965
966pub async fn show_flows(
968 stmt: ShowFlows,
969 query_engine: &QueryEngineRef,
970 catalog_manager: &CatalogManagerRef,
971 query_ctx: QueryContextRef,
972) -> Result<Output> {
973 let projects = vec![(flows::FLOW_NAME, FLOWS_COLUMN)];
974 let filters = vec![col(flows::TABLE_CATALOG).eq(lit(query_ctx.current_catalog()))];
975 let like_field = Some(flows::FLOW_NAME);
976 let sort = vec![col(flows::FLOW_NAME).sort(true, true)];
977
978 query_from_information_schema_table(
979 query_engine,
980 catalog_manager,
981 query_ctx,
982 FLOWS,
983 vec![],
984 projects,
985 filters,
986 like_field,
987 sort,
988 stmt.kind,
989 )
990 .await
991}
992
993#[cfg(feature = "enterprise")]
994pub async fn show_triggers(
995 stmt: sql::statements::show::trigger::ShowTriggers,
996 query_engine: &QueryEngineRef,
997 catalog_manager: &CatalogManagerRef,
998 query_ctx: QueryContextRef,
999) -> Result<Output> {
1000 const TRIGGER_NAME: &str = "trigger_name";
1001 const TRIGGERS_COLUMN: &str = "Triggers";
1002
1003 let projects = vec![(TRIGGER_NAME, TRIGGERS_COLUMN)];
1004 let like_field = Some(TRIGGER_NAME);
1005 let sort = vec![col(TRIGGER_NAME).sort(true, true)];
1006
1007 query_from_information_schema_table(
1008 query_engine,
1009 catalog_manager,
1010 query_ctx,
1011 catalog::information_schema::TRIGGERS,
1012 vec![],
1013 projects,
1014 vec![],
1015 like_field,
1016 sort,
1017 stmt.kind,
1018 )
1019 .await
1020}
1021
1022pub fn show_create_flow(
1023 flow_name: ObjectName,
1024 flow_val: FlowInfoValue,
1025 query_ctx: QueryContextRef,
1026) -> Result<Output> {
1027 let mut parser_ctx =
1028 ParserContext::new(query_ctx.sql_dialect(), flow_val.raw_sql()).context(error::SqlSnafu)?;
1029
1030 let query = parser_ctx.parse_statement().context(error::SqlSnafu)?;
1031
1032 let raw_query = match &query {
1034 Statement::Tql(_) => flow_val.raw_sql().clone(),
1035 _ => query.to_string(),
1036 };
1037
1038 let query = Box::new(SqlOrTql::try_from_statement(query, &raw_query).context(error::SqlSnafu)?);
1039
1040 let comment = if flow_val.comment().is_empty() {
1041 None
1042 } else {
1043 Some(flow_val.comment().clone())
1044 };
1045
1046 let stmt = CreateFlow {
1047 flow_name,
1048 sink_table_name: ObjectName::from(vec![Ident::new(&flow_val.sink_table_name().table_name)]),
1049 or_replace: false,
1052 if_not_exists: true,
1053 expire_after: flow_val.expire_after(),
1054 eval_interval: flow_val.eval_interval(),
1055 comment,
1056 query,
1057 };
1058
1059 let sql = format!("{}", stmt);
1060 let columns = vec![
1061 Arc::new(StringVector::from(vec![flow_val.flow_name().clone()])) as _,
1062 Arc::new(StringVector::from(vec![sql])) as _,
1063 ];
1064 let records = RecordBatches::try_from_columns(SHOW_CREATE_FLOW_OUTPUT_SCHEMA.clone(), columns)
1065 .context(error::CreateRecordBatchSnafu)?;
1066
1067 Ok(Output::new_with_record_batches(records))
1068}
1069
1070pub fn describe_table(table: TableRef) -> Result<Output> {
1071 let table_info = table.table_info();
1072 let columns_schemas = table_info.meta.schema.column_schemas();
1073 let columns = vec![
1074 describe_column_names(columns_schemas),
1075 describe_column_types(columns_schemas),
1076 describe_column_keys(columns_schemas, &table_info.meta.primary_key_indices),
1077 describe_column_nullables(columns_schemas),
1078 describe_column_defaults(columns_schemas),
1079 describe_column_semantic_types(columns_schemas, &table_info.meta.primary_key_indices),
1080 ];
1081 let records = RecordBatches::try_from_columns(DESCRIBE_TABLE_OUTPUT_SCHEMA.clone(), columns)
1082 .context(error::CreateRecordBatchSnafu)?;
1083 Ok(Output::new_with_record_batches(records))
1084}
1085
1086fn describe_column_names(columns_schemas: &[ColumnSchema]) -> VectorRef {
1087 Arc::new(StringVector::from_iterator(
1088 columns_schemas.iter().map(|cs| cs.name.as_str()),
1089 ))
1090}
1091
1092fn describe_column_types(columns_schemas: &[ColumnSchema]) -> VectorRef {
1093 Arc::new(StringVector::from(
1094 columns_schemas
1095 .iter()
1096 .map(|cs| cs.data_type.name())
1097 .collect::<Vec<_>>(),
1098 ))
1099}
1100
1101fn describe_column_keys(
1102 columns_schemas: &[ColumnSchema],
1103 primary_key_indices: &[usize],
1104) -> VectorRef {
1105 Arc::new(StringVector::from_iterator(
1106 columns_schemas.iter().enumerate().map(|(i, cs)| {
1107 if cs.is_time_index() || primary_key_indices.contains(&i) {
1108 PRI_KEY
1109 } else {
1110 ""
1111 }
1112 }),
1113 ))
1114}
1115
1116fn describe_column_nullables(columns_schemas: &[ColumnSchema]) -> VectorRef {
1117 Arc::new(StringVector::from_iterator(columns_schemas.iter().map(
1118 |cs| {
1119 if cs.is_nullable() { YES_STR } else { NO_STR }
1120 },
1121 )))
1122}
1123
1124fn describe_column_defaults(columns_schemas: &[ColumnSchema]) -> VectorRef {
1125 Arc::new(StringVector::from(
1126 columns_schemas
1127 .iter()
1128 .map(|cs| {
1129 cs.default_constraint()
1130 .map_or(String::from(""), |dc| dc.to_string())
1131 })
1132 .collect::<Vec<String>>(),
1133 ))
1134}
1135
1136fn describe_column_semantic_types(
1137 columns_schemas: &[ColumnSchema],
1138 primary_key_indices: &[usize],
1139) -> VectorRef {
1140 Arc::new(StringVector::from_iterator(
1141 columns_schemas.iter().enumerate().map(|(i, cs)| {
1142 if primary_key_indices.contains(&i) {
1143 SEMANTIC_TYPE_PRIMARY_KEY
1144 } else if cs.is_time_index() {
1145 SEMANTIC_TYPE_TIME_INDEX
1146 } else {
1147 SEMANTIC_TYPE_FIELD
1148 }
1149 }),
1150 ))
1151}
1152
1153pub async fn prepare_file_table_files(
1155 options: &HashMap<String, String>,
1156) -> Result<(ObjectStore, Vec<String>)> {
1157 let url = options
1158 .get(FILE_TABLE_LOCATION_KEY)
1159 .context(error::MissingRequiredFieldSnafu {
1160 name: FILE_TABLE_LOCATION_KEY,
1161 })?;
1162
1163 let (dir, filename) = find_dir_and_filename(url);
1164 let source = if let Some(filename) = filename {
1165 Source::Filename(filename)
1166 } else {
1167 Source::Dir
1168 };
1169 let regex = options
1170 .get(FILE_TABLE_PATTERN_KEY)
1171 .map(|x| Regex::new(x))
1172 .transpose()
1173 .context(error::BuildRegexSnafu)?;
1174 let object_store = build_backend(url, options).context(error::BuildBackendSnafu)?;
1175 let lister = Lister::new(object_store.clone(), source, dir, regex);
1176 let files = lister
1181 .list()
1182 .await
1183 .context(error::ListObjectsSnafu)?
1184 .into_iter()
1185 .filter_map(|entry| {
1186 if entry.path().ends_with('/') {
1187 None
1188 } else {
1189 Some(entry.path().to_string())
1190 }
1191 })
1192 .collect::<Vec<_>>();
1193 Ok((object_store, files))
1194}
1195
1196pub async fn infer_file_table_schema(
1197 object_store: &ObjectStore,
1198 files: &[String],
1199 options: &HashMap<String, String>,
1200) -> Result<RawSchema> {
1201 let format = parse_file_table_format(options)?;
1202 let merged = infer_schemas(object_store, files, format.as_ref())
1203 .await
1204 .context(error::InferSchemaSnafu)?;
1205 Ok(RawSchema::from(
1206 &Schema::try_from(merged).context(error::ConvertSchemaSnafu)?,
1207 ))
1208}
1209
1210pub fn file_column_schemas_to_table(
1220 file_column_schemas: &[ColumnSchema],
1221) -> (Vec<ColumnSchema>, String) {
1222 let mut column_schemas = file_column_schemas.to_owned();
1223 if let Some(time_index_column) = column_schemas.iter().find(|c| c.is_time_index()) {
1224 let time_index = time_index_column.name.clone();
1225 return (column_schemas, time_index);
1226 }
1227
1228 let timestamp_type = ConcreteDataType::timestamp_millisecond_datatype();
1229 let default_zero = Value::Timestamp(Timestamp::new_millisecond(0));
1230 let timestamp_column_schema = ColumnSchema::new(greptime_timestamp(), timestamp_type, false)
1231 .with_time_index(true)
1232 .with_default_constraint(Some(ColumnDefaultConstraint::Value(default_zero)))
1233 .unwrap();
1234
1235 if let Some(column_schema) = column_schemas
1236 .iter_mut()
1237 .find(|column_schema| column_schema.name == greptime_timestamp())
1238 {
1239 *column_schema = timestamp_column_schema;
1241 } else {
1242 column_schemas.push(timestamp_column_schema);
1243 }
1244
1245 (column_schemas, greptime_timestamp().to_string())
1246}
1247
1248pub fn check_file_to_table_schema_compatibility(
1257 file_column_schemas: &[ColumnSchema],
1258 table_column_schemas: &[ColumnSchema],
1259) -> Result<()> {
1260 let file_schemas_map = file_column_schemas
1261 .iter()
1262 .map(|s| (s.name.clone(), s))
1263 .collect::<HashMap<_, _>>();
1264
1265 for table_column in table_column_schemas {
1266 if let Some(file_column) = file_schemas_map.get(&table_column.name) {
1267 ensure!(
1269 file_column
1270 .data_type
1271 .can_arrow_type_cast_to(&table_column.data_type),
1272 error::ColumnSchemaIncompatibleSnafu {
1273 column: table_column.name.clone(),
1274 file_type: file_column.data_type.clone(),
1275 table_type: table_column.data_type.clone(),
1276 }
1277 );
1278 } else {
1279 ensure!(
1280 table_column.is_nullable() || table_column.default_constraint().is_some(),
1281 error::ColumnSchemaNoDefaultSnafu {
1282 column: table_column.name.clone(),
1283 }
1284 );
1285 }
1286 }
1287
1288 Ok(())
1289}
1290
1291fn parse_file_table_format(options: &HashMap<String, String>) -> Result<Box<dyn FileFormat>> {
1292 Ok(
1293 match Format::try_from(options).context(error::ParseFileFormatSnafu)? {
1294 Format::Csv(format) => Box::new(format),
1295 Format::Json(format) => Box::new(format),
1296 Format::Parquet(format) => Box::new(format),
1297 Format::Orc(format) => Box::new(format),
1298 },
1299 )
1300}
1301
1302pub async fn show_processlist(
1303 stmt: ShowProcessList,
1304 query_engine: &QueryEngineRef,
1305 catalog_manager: &CatalogManagerRef,
1306 query_ctx: QueryContextRef,
1307) -> Result<Output> {
1308 let projects = if stmt.full {
1309 vec![
1310 (process_list::ID, "Id"),
1311 (process_list::CATALOG, "Catalog"),
1312 (process_list::SCHEMAS, "Schema"),
1313 (process_list::CLIENT, "Client"),
1314 (process_list::FRONTEND, "Frontend"),
1315 (process_list::START_TIMESTAMP, "Start Time"),
1316 (process_list::ELAPSED_TIME, "Elapsed Time"),
1317 (process_list::QUERY, "Query"),
1318 ]
1319 } else {
1320 vec![
1321 (process_list::ID, "Id"),
1322 (process_list::CATALOG, "Catalog"),
1323 (process_list::QUERY, "Query"),
1324 (process_list::ELAPSED_TIME, "Elapsed Time"),
1325 ]
1326 };
1327
1328 let filters = vec![];
1329 let like_field = None;
1330 let sort = vec![col("id").sort(true, true)];
1331 query_from_information_schema_table(
1332 query_engine,
1333 catalog_manager,
1334 query_ctx.clone(),
1335 "process_list",
1336 vec![],
1337 projects.clone(),
1338 filters,
1339 like_field,
1340 sort,
1341 ShowKind::All,
1342 )
1343 .await
1344}
1345
1346#[cfg(test)]
1347mod test {
1348 use std::sync::Arc;
1349
1350 use common_query::{Output, OutputData};
1351 use common_recordbatch::{RecordBatch, RecordBatches};
1352 use common_time::Timezone;
1353 use common_time::timestamp::TimeUnit;
1354 use datatypes::prelude::ConcreteDataType;
1355 use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, Schema, SchemaRef};
1356 use datatypes::vectors::{StringVector, TimestampMillisecondVector, UInt32Vector, VectorRef};
1357 use session::context::QueryContextBuilder;
1358 use snafu::ResultExt;
1359 use sql::ast::{Ident, ObjectName};
1360 use sql::statements::show::ShowVariables;
1361 use table::TableRef;
1362 use table::test_util::MemTable;
1363
1364 use super::show_variable;
1365 use crate::error;
1366 use crate::error::Result;
1367 use crate::sql::{
1368 DESCRIBE_TABLE_OUTPUT_SCHEMA, NO_STR, SEMANTIC_TYPE_FIELD, SEMANTIC_TYPE_TIME_INDEX,
1369 YES_STR, describe_table,
1370 };
1371
1372 #[test]
1373 fn test_describe_table_multiple_columns() -> Result<()> {
1374 let table_name = "test_table";
1375 let schema = vec![
1376 ColumnSchema::new("t1", ConcreteDataType::uint32_datatype(), true),
1377 ColumnSchema::new(
1378 "t2",
1379 ConcreteDataType::timestamp_datatype(TimeUnit::Millisecond),
1380 false,
1381 )
1382 .with_default_constraint(Some(ColumnDefaultConstraint::Function(String::from(
1383 "current_timestamp()",
1384 ))))
1385 .unwrap()
1386 .with_time_index(true),
1387 ];
1388 let data = vec![
1389 Arc::new(UInt32Vector::from_slice([0])) as _,
1390 Arc::new(TimestampMillisecondVector::from_slice([0])) as _,
1391 ];
1392 let expected_columns = vec![
1393 Arc::new(StringVector::from(vec!["t1", "t2"])) as _,
1394 Arc::new(StringVector::from(vec!["UInt32", "TimestampMillisecond"])) as _,
1395 Arc::new(StringVector::from(vec!["", "PRI"])) as _,
1396 Arc::new(StringVector::from(vec![YES_STR, NO_STR])) as _,
1397 Arc::new(StringVector::from(vec!["", "current_timestamp()"])) as _,
1398 Arc::new(StringVector::from(vec![
1399 SEMANTIC_TYPE_FIELD,
1400 SEMANTIC_TYPE_TIME_INDEX,
1401 ])) as _,
1402 ];
1403
1404 describe_table_test_by_schema(table_name, schema, data, expected_columns)
1405 }
1406
1407 fn describe_table_test_by_schema(
1408 table_name: &str,
1409 schema: Vec<ColumnSchema>,
1410 data: Vec<VectorRef>,
1411 expected_columns: Vec<VectorRef>,
1412 ) -> Result<()> {
1413 let table_schema = SchemaRef::new(Schema::new(schema));
1414 let table = prepare_describe_table(table_name, table_schema, data);
1415
1416 let expected =
1417 RecordBatches::try_from_columns(DESCRIBE_TABLE_OUTPUT_SCHEMA.clone(), expected_columns)
1418 .context(error::CreateRecordBatchSnafu)?;
1419
1420 if let OutputData::RecordBatches(res) = describe_table(table)?.data {
1421 assert_eq!(res.take(), expected.take());
1422 } else {
1423 panic!("describe table must return record batch");
1424 }
1425
1426 Ok(())
1427 }
1428
1429 fn prepare_describe_table(
1430 table_name: &str,
1431 table_schema: SchemaRef,
1432 data: Vec<VectorRef>,
1433 ) -> TableRef {
1434 let record_batch = RecordBatch::new(table_schema, data).unwrap();
1435 MemTable::table(table_name, record_batch)
1436 }
1437
1438 #[test]
1439 fn test_show_variable() {
1440 assert_eq!(
1441 exec_show_variable("SYSTEM_TIME_ZONE", "Asia/Shanghai").unwrap(),
1442 "UTC"
1443 );
1444 assert_eq!(
1445 exec_show_variable("SYSTEM_TIMEZONE", "Asia/Shanghai").unwrap(),
1446 "UTC"
1447 );
1448 assert_eq!(
1449 exec_show_variable("TIME_ZONE", "Asia/Shanghai").unwrap(),
1450 "Asia/Shanghai"
1451 );
1452 assert_eq!(
1453 exec_show_variable("TIMEZONE", "Asia/Shanghai").unwrap(),
1454 "Asia/Shanghai"
1455 );
1456 assert!(exec_show_variable("TIME ZONE", "Asia/Shanghai").is_err());
1457 assert!(exec_show_variable("SYSTEM TIME ZONE", "Asia/Shanghai").is_err());
1458 }
1459
1460 fn exec_show_variable(variable: &str, tz: &str) -> Result<String> {
1461 let stmt = ShowVariables {
1462 variable: ObjectName::from(vec![Ident::new(variable)]),
1463 };
1464 let ctx = Arc::new(
1465 QueryContextBuilder::default()
1466 .timezone(Timezone::from_tz_string(tz).unwrap())
1467 .build(),
1468 );
1469 match show_variable(stmt, ctx) {
1470 Ok(Output {
1471 data: OutputData::RecordBatches(record),
1472 ..
1473 }) => {
1474 let record = record.take().first().cloned().unwrap();
1475 Ok(record.iter_column_as_string(0).next().unwrap().unwrap())
1476 }
1477 Ok(_) => unreachable!(),
1478 Err(e) => Err(e),
1479 }
1480 }
1481}