mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-25 01:10:37 +00:00
refactor: show tables and show databases (#3423)
* refactor: show tables and show databases * chore: clean code
This commit is contained in:
@@ -19,9 +19,9 @@ mod partitions;
|
||||
mod predicate;
|
||||
mod region_peers;
|
||||
mod runtime_metrics;
|
||||
mod schemata;
|
||||
pub mod schemata;
|
||||
mod table_names;
|
||||
mod tables;
|
||||
pub mod tables;
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, Weak};
|
||||
|
||||
@@ -37,8 +37,8 @@ use crate::error::{
|
||||
use crate::information_schema::{InformationTable, Predicates};
|
||||
use crate::CatalogManager;
|
||||
|
||||
const CATALOG_NAME: &str = "catalog_name";
|
||||
const SCHEMA_NAME: &str = "schema_name";
|
||||
pub const CATALOG_NAME: &str = "catalog_name";
|
||||
pub const SCHEMA_NAME: &str = "schema_name";
|
||||
const DEFAULT_CHARACTER_SET_NAME: &str = "default_character_set_name";
|
||||
const DEFAULT_COLLATION_NAME: &str = "default_collation_name";
|
||||
const INIT_CAPACITY: usize = 42;
|
||||
|
||||
@@ -39,10 +39,10 @@ use crate::error::{
|
||||
use crate::information_schema::{InformationTable, Predicates};
|
||||
use crate::CatalogManager;
|
||||
|
||||
const TABLE_CATALOG: &str = "table_catalog";
|
||||
const TABLE_SCHEMA: &str = "table_schema";
|
||||
const TABLE_NAME: &str = "table_name";
|
||||
const TABLE_TYPE: &str = "table_type";
|
||||
pub const TABLE_CATALOG: &str = "table_catalog";
|
||||
pub const TABLE_SCHEMA: &str = "table_schema";
|
||||
pub const TABLE_NAME: &str = "table_name";
|
||||
pub const TABLE_TYPE: &str = "table_type";
|
||||
const TABLE_ID: &str = "table_id";
|
||||
const ENGINE: &str = "engine";
|
||||
const INIT_CAPACITY: usize = 42;
|
||||
|
||||
@@ -33,7 +33,7 @@ impl StatementExecutor {
|
||||
stmt: ShowDatabases,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<Output> {
|
||||
query::sql::show_databases(stmt, self.catalog_manager.clone(), query_ctx)
|
||||
query::sql::show_databases(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
|
||||
.await
|
||||
.context(ExecuteStatementSnafu)
|
||||
}
|
||||
@@ -44,7 +44,7 @@ impl StatementExecutor {
|
||||
stmt: ShowTables,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<Output> {
|
||||
query::sql::show_tables(stmt, self.catalog_manager.clone(), query_ctx)
|
||||
query::sql::show_tables(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
|
||||
.await
|
||||
.context(ExecuteStatementSnafu)
|
||||
}
|
||||
|
||||
@@ -30,32 +30,26 @@ use common_query::physical_plan::{DfPhysicalPlanAdapter, PhysicalPlan, PhysicalP
|
||||
use common_query::prelude::ScalarUdf;
|
||||
use common_query::Output;
|
||||
use common_recordbatch::adapter::RecordBatchStreamAdapter;
|
||||
use common_recordbatch::{
|
||||
EmptyRecordBatchStream, RecordBatch, RecordBatches, SendableRecordBatchStream,
|
||||
};
|
||||
use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream};
|
||||
use common_telemetry::tracing;
|
||||
use datafusion::common::Column;
|
||||
use datafusion::physical_plan::analyze::AnalyzeExec;
|
||||
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
|
||||
use datafusion::physical_plan::ExecutionPlan;
|
||||
use datafusion::prelude::SessionContext;
|
||||
use datafusion_common::{ResolvedTableReference, ScalarValue};
|
||||
use datafusion_expr::{DmlStatement, Expr as DfExpr, LogicalPlan as DfLogicalPlan, WriteOp};
|
||||
use datafusion_common::ResolvedTableReference;
|
||||
use datafusion_expr::{DmlStatement, LogicalPlan as DfLogicalPlan, WriteOp};
|
||||
use datatypes::prelude::VectorRef;
|
||||
use datatypes::schema::Schema;
|
||||
use futures_util::StreamExt;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use sql::ast::{BinaryOperator, Expr, Value};
|
||||
use table::requests::{DeleteRequest, InsertRequest};
|
||||
use table::TableRef;
|
||||
|
||||
use crate::dataframe::DataFrame;
|
||||
pub use crate::datafusion::planner::DfContextProviderAdapter;
|
||||
use crate::error::{
|
||||
CatalogSnafu, CreateRecordBatchSnafu, CreateSchemaSnafu, DataFusionSnafu,
|
||||
MissingTableMutationHandlerSnafu, MissingTimestampColumnSnafu, QueryExecutionSnafu, Result,
|
||||
TableMutationSnafu, TableNotFoundSnafu, UnimplementedSnafu, UnsupportedExprSnafu,
|
||||
CatalogSnafu, CreateRecordBatchSnafu, DataFusionSnafu, MissingTableMutationHandlerSnafu,
|
||||
MissingTimestampColumnSnafu, QueryExecutionSnafu, Result, TableMutationSnafu,
|
||||
TableNotFoundSnafu, UnsupportedExprSnafu,
|
||||
};
|
||||
use crate::executor::QueryExecutor;
|
||||
use crate::logical_optimizer::LogicalOptimizer;
|
||||
@@ -456,78 +450,6 @@ impl QueryExecutor for DatafusionQueryEngine {
|
||||
}
|
||||
}
|
||||
|
||||
fn convert_filter_to_df_filter(filter: Expr) -> Result<DfExpr> {
|
||||
match filter {
|
||||
Expr::BinaryOp { left, op, right } => {
|
||||
let left = convert_filter_to_df_filter(*left)?;
|
||||
let right = convert_filter_to_df_filter(*right)?;
|
||||
match op {
|
||||
BinaryOperator::Eq => Ok(left.eq(right)),
|
||||
_ => UnimplementedSnafu {
|
||||
operation: format!("convert BinaryOperator into datafusion Expr, op: {op}"),
|
||||
}
|
||||
.fail(),
|
||||
}
|
||||
}
|
||||
Expr::Value(value) => match value {
|
||||
Value::SingleQuotedString(v) => Ok(DfExpr::Literal(ScalarValue::Utf8(Some(v)))),
|
||||
_ => UnimplementedSnafu {
|
||||
operation: format!("convert Expr::Value into datafusion Expr, value: {value}"),
|
||||
}
|
||||
.fail(),
|
||||
},
|
||||
Expr::Identifier(ident) => Ok(DfExpr::Column(Column::from_name(ident.value))),
|
||||
_ => UnimplementedSnafu {
|
||||
operation: format!("convert Expr into datafusion Expr, Expr: {filter}"),
|
||||
}
|
||||
.fail(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a table in memory and executes a show statement on the table.
|
||||
pub async fn execute_show_with_filter(
|
||||
record_batch: RecordBatch,
|
||||
filter: Option<Expr>,
|
||||
) -> Result<Output> {
|
||||
let table_name = "table_name";
|
||||
let column_schemas = record_batch.schema.column_schemas().to_vec();
|
||||
let context = SessionContext::new();
|
||||
context
|
||||
.register_batch(table_name, record_batch.into_df_record_batch())
|
||||
.context(error::DatafusionSnafu)
|
||||
.map_err(BoxedError::new)
|
||||
.context(QueryExecutionSnafu)?;
|
||||
let mut dataframe = context
|
||||
.sql(&format!("SELECT * FROM {table_name}"))
|
||||
.await
|
||||
.context(error::DatafusionSnafu)
|
||||
.map_err(BoxedError::new)
|
||||
.context(QueryExecutionSnafu)?;
|
||||
if let Some(filter) = filter {
|
||||
let filter = convert_filter_to_df_filter(filter)?;
|
||||
dataframe = dataframe
|
||||
.filter(filter)
|
||||
.context(error::DatafusionSnafu)
|
||||
.map_err(BoxedError::new)
|
||||
.context(QueryExecutionSnafu)?
|
||||
}
|
||||
let df_batches = dataframe
|
||||
.collect()
|
||||
.await
|
||||
.context(error::DatafusionSnafu)
|
||||
.map_err(BoxedError::new)
|
||||
.context(QueryExecutionSnafu)?;
|
||||
let mut batches = Vec::with_capacity(df_batches.len());
|
||||
let schema = Arc::new(Schema::try_new(column_schemas).context(CreateSchemaSnafu)?);
|
||||
for df_batch in df_batches.into_iter() {
|
||||
let batch = RecordBatch::try_from_df_record_batch(schema.clone(), df_batch)
|
||||
.context(CreateRecordBatchSnafu)?;
|
||||
batches.push(batch);
|
||||
}
|
||||
let record_batches = RecordBatches::try_new(schema, batches).context(CreateRecordBatchSnafu)?;
|
||||
Ok(Output::RecordBatches(record_batches))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::borrow::Cow::Borrowed;
|
||||
@@ -536,17 +458,12 @@ mod tests {
|
||||
use catalog::RegisterTableRequest;
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, NUMBERS_TABLE_ID};
|
||||
use common_query::Output;
|
||||
use common_recordbatch::{util, RecordBatch};
|
||||
use common_recordbatch::util;
|
||||
use datafusion::prelude::{col, lit};
|
||||
use datatypes::prelude::{ConcreteDataType, MutableVector, ScalarVectorBuilder};
|
||||
use datatypes::schema::{ColumnSchema, Schema};
|
||||
use datatypes::types::StringType;
|
||||
use datatypes::vectors::{Helper, StringVectorBuilder, UInt32Vector, UInt64Vector, VectorRef};
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use datatypes::vectors::{Helper, UInt32Vector, UInt64Vector, VectorRef};
|
||||
use session::context::QueryContext;
|
||||
use sql::dialect::GreptimeDbDialect;
|
||||
use sql::parser::{ParseOptions, ParserContext};
|
||||
use sql::statements::show::{ShowKind, ShowTables};
|
||||
use sql::statements::statement::Statement;
|
||||
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
|
||||
|
||||
use super::*;
|
||||
@@ -691,71 +608,4 @@ mod tests {
|
||||
);
|
||||
assert_eq!("Limit: skip=0, fetch=20\n Aggregate: groupBy=[[]], aggr=[[SUM(CAST(numbers.number AS UInt64))]]\n TableScan: numbers projection=[number]", format!("{}", logical_plan.display_indent()));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_show_tables() {
|
||||
// No filter
|
||||
let column_schemas = vec![ColumnSchema::new(
|
||||
"Tables",
|
||||
ConcreteDataType::String(StringType),
|
||||
false,
|
||||
)];
|
||||
let schema = Arc::new(Schema::new(column_schemas));
|
||||
let mut builder = StringVectorBuilder::with_capacity(3);
|
||||
builder.push(Some("monitor"));
|
||||
builder.push(Some("system_metrics"));
|
||||
let columns = vec![builder.to_vector()];
|
||||
let record_batch = RecordBatch::new(schema, columns).unwrap();
|
||||
let output = execute_show_with_filter(record_batch, None).await.unwrap();
|
||||
let Output::RecordBatches(record_batches) = output else {
|
||||
unreachable!()
|
||||
};
|
||||
let expected = "\
|
||||
+----------------+
|
||||
| Tables |
|
||||
+----------------+
|
||||
| monitor |
|
||||
| system_metrics |
|
||||
+----------------+";
|
||||
assert_eq!(record_batches.pretty_print().unwrap(), expected);
|
||||
|
||||
// Filter
|
||||
let column_schemas = vec![ColumnSchema::new(
|
||||
"Tables",
|
||||
ConcreteDataType::String(StringType),
|
||||
false,
|
||||
)];
|
||||
let schema = Arc::new(Schema::new(column_schemas));
|
||||
let mut builder = StringVectorBuilder::with_capacity(3);
|
||||
builder.push(Some("monitor"));
|
||||
builder.push(Some("system_metrics"));
|
||||
let columns = vec![builder.to_vector()];
|
||||
let record_batch = RecordBatch::new(schema, columns).unwrap();
|
||||
let statement = ParserContext::create_with_dialect(
|
||||
"SHOW TABLES WHERE \"Tables\"='monitor'",
|
||||
&GreptimeDbDialect {},
|
||||
ParseOptions::default(),
|
||||
)
|
||||
.unwrap()[0]
|
||||
.clone();
|
||||
let Statement::ShowTables(ShowTables { kind, .. }) = statement else {
|
||||
unreachable!()
|
||||
};
|
||||
let ShowKind::Where(filter) = kind else {
|
||||
unreachable!()
|
||||
};
|
||||
let output = execute_show_with_filter(record_batch, Some(filter))
|
||||
.await
|
||||
.unwrap();
|
||||
let Output::RecordBatches(record_batches) = output else {
|
||||
unreachable!()
|
||||
};
|
||||
let expected = "\
|
||||
+---------+
|
||||
| Tables |
|
||||
+---------+
|
||||
| monitor |
|
||||
+---------+";
|
||||
assert_eq!(record_batches.pretty_print().unwrap(), expected);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -49,12 +49,16 @@ impl DfContextProviderAdapter {
|
||||
pub(crate) async fn try_new(
|
||||
engine_state: Arc<QueryEngineState>,
|
||||
session_state: SessionState,
|
||||
df_stmt: &DfStatement,
|
||||
df_stmt: Option<&DfStatement>,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<Self> {
|
||||
let table_names = session_state
|
||||
.resolve_table_references(df_stmt)
|
||||
.context(DataFusionSnafu)?;
|
||||
let table_names = if let Some(df_stmt) = df_stmt {
|
||||
session_state
|
||||
.resolve_table_references(df_stmt)
|
||||
.context(DataFusionSnafu)?
|
||||
} else {
|
||||
vec![]
|
||||
};
|
||||
|
||||
let mut table_provider = DfTableSourceProvider::new(
|
||||
engine_state.catalog_manager().clone(),
|
||||
|
||||
@@ -54,24 +54,12 @@ pub enum Error {
|
||||
#[snafu(display("Table not found: {}", table))]
|
||||
TableNotFound { table: String, location: Location },
|
||||
|
||||
#[snafu(display("Failed to do vector computation"))]
|
||||
VectorComputation {
|
||||
source: datatypes::error::Error,
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to create RecordBatch"))]
|
||||
CreateRecordBatch {
|
||||
source: common_recordbatch::error::Error,
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to create Schema"))]
|
||||
CreateSchema {
|
||||
source: datatypes::error::Error,
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failure during query execution"))]
|
||||
QueryExecution {
|
||||
source: BoxedError,
|
||||
@@ -291,9 +279,7 @@ impl ErrorExt for Error {
|
||||
|
||||
QueryAccessDenied { .. } => StatusCode::AccessDenied,
|
||||
Catalog { source, .. } => source.status_code(),
|
||||
VectorComputation { source, .. } | ConvertDatafusionSchema { source, .. } => {
|
||||
source.status_code()
|
||||
}
|
||||
ConvertDatafusionSchema { source, .. } => source.status_code(),
|
||||
CreateRecordBatch { source, .. } => source.status_code(),
|
||||
QueryExecution { source, .. } | QueryPlan { source, .. } => source.status_code(),
|
||||
DataFusion { error, .. } => match error {
|
||||
@@ -306,7 +292,6 @@ impl ErrorExt for Error {
|
||||
Sql { source, .. } => source.status_code(),
|
||||
PlanSql { .. } => StatusCode::PlanQuery,
|
||||
ConvertSqlType { source, .. } | ConvertSqlValue { source, .. } => source.status_code(),
|
||||
CreateSchema { source, .. } => source.status_code(),
|
||||
|
||||
RegionQuery { source, .. } => source.status_code(),
|
||||
TableMutation { source, .. } => source.status_code(),
|
||||
|
||||
@@ -12,18 +12,23 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::any::Any;
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use catalog::table_source::DfTableSourceProvider;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_telemetry::tracing;
|
||||
use datafusion::common::DFSchema;
|
||||
use datafusion::execution::context::SessionState;
|
||||
use datafusion::sql::planner::PlannerContext;
|
||||
use datafusion_expr::Expr as DfExpr;
|
||||
use datafusion_sql::planner::{ParserOptions, SqlToRel};
|
||||
use promql::planner::PromPlanner;
|
||||
use promql_parser::parser::EvalStmt;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::ResultExt;
|
||||
use sql::ast::Expr as SqlExpr;
|
||||
use sql::statements::statement::Statement;
|
||||
|
||||
use crate::error::{DataFusionSnafu, PlanSqlSnafu, QueryPlanSnafu, Result, SqlSnafu};
|
||||
@@ -36,6 +41,8 @@ use crate::{DfContextProviderAdapter, QueryEngineContext};
|
||||
#[async_trait]
|
||||
pub trait LogicalPlanner: Send + Sync {
|
||||
async fn plan(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<LogicalPlan>;
|
||||
|
||||
fn as_any(&self) -> &dyn Any;
|
||||
}
|
||||
|
||||
pub struct DfLogicalPlanner {
|
||||
@@ -65,7 +72,7 @@ impl DfLogicalPlanner {
|
||||
let context_provider = DfContextProviderAdapter::try_new(
|
||||
self.engine_state.clone(),
|
||||
self.session_state.clone(),
|
||||
&df_stmt,
|
||||
Some(&df_stmt),
|
||||
query_ctx.clone(),
|
||||
)
|
||||
.await?;
|
||||
@@ -95,6 +102,36 @@ impl DfLogicalPlanner {
|
||||
Ok(LogicalPlan::DfPlan(plan))
|
||||
}
|
||||
|
||||
/// Generate a relational expression from a SQL expression
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub(crate) async fn sql_to_expr(
|
||||
&self,
|
||||
sql: SqlExpr,
|
||||
schema: &DFSchema,
|
||||
normalize_ident: bool,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<DfExpr> {
|
||||
let context_provider = DfContextProviderAdapter::try_new(
|
||||
self.engine_state.clone(),
|
||||
self.session_state.clone(),
|
||||
None,
|
||||
query_ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let config_options = self.session_state.config().options();
|
||||
let parser_options = ParserOptions {
|
||||
enable_ident_normalization: normalize_ident,
|
||||
parse_float_as_decimal: config_options.sql_parser.parse_float_as_decimal,
|
||||
};
|
||||
|
||||
let sql_to_rel = SqlToRel::new_with_options(&context_provider, parser_options);
|
||||
|
||||
sql_to_rel
|
||||
.sql_to_expr(sql.into(), schema, &mut PlannerContext::new())
|
||||
.context(DataFusionSnafu)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn plan_pql(&self, stmt: EvalStmt, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
|
||||
let table_provider = DfTableSourceProvider::new(
|
||||
@@ -119,4 +156,8 @@ impl LogicalPlanner for DfLogicalPlanner {
|
||||
QueryStatement::Promql(stmt) => self.plan_pql(stmt, query_ctx).await,
|
||||
}
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,22 +17,28 @@ mod show_create_table;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use catalog::information_schema::{schemata, tables, SCHEMATA, TABLES};
|
||||
use catalog::CatalogManagerRef;
|
||||
use common_catalog::consts::{
|
||||
SEMANTIC_TYPE_FIELD, SEMANTIC_TYPE_PRIMARY_KEY, SEMANTIC_TYPE_TIME_INDEX,
|
||||
INFORMATION_SCHEMA_NAME, SEMANTIC_TYPE_FIELD, SEMANTIC_TYPE_PRIMARY_KEY,
|
||||
SEMANTIC_TYPE_TIME_INDEX,
|
||||
};
|
||||
use common_catalog::format_full_table_name;
|
||||
use common_datasource::file_format::{infer_schemas, FileFormat, Format};
|
||||
use common_datasource::lister::{Lister, Source};
|
||||
use common_datasource::object_store::build_backend;
|
||||
use common_datasource::util::find_dir_and_filename;
|
||||
use common_query::prelude::GREPTIME_TIMESTAMP;
|
||||
use common_query::Output;
|
||||
use common_recordbatch::{RecordBatch, RecordBatches};
|
||||
use common_recordbatch::adapter::RecordBatchStreamAdapter;
|
||||
use common_recordbatch::RecordBatches;
|
||||
use common_time::timezone::get_timezone;
|
||||
use common_time::Timestamp;
|
||||
use datafusion::prelude::SessionContext;
|
||||
use datafusion_expr::{col, lit, Expr};
|
||||
use datatypes::prelude::*;
|
||||
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, RawSchema, Schema};
|
||||
use datatypes::vectors::{Helper, StringVector};
|
||||
use datatypes::vectors::StringVector;
|
||||
use object_store::ObjectStore;
|
||||
use once_cell::sync::Lazy;
|
||||
use regex::Regex;
|
||||
@@ -44,11 +50,14 @@ use sql::statements::show::{ShowDatabases, ShowKind, ShowTables, ShowVariables};
|
||||
use table::requests::{FILE_TABLE_LOCATION_KEY, FILE_TABLE_PATTERN_KEY};
|
||||
use table::TableRef;
|
||||
|
||||
use crate::datafusion::execute_show_with_filter;
|
||||
use crate::dataframe::DataFrame;
|
||||
use crate::error::{self, Result, UnsupportedVariableSnafu};
|
||||
use crate::planner::DfLogicalPlanner;
|
||||
use crate::QueryEngineRef;
|
||||
|
||||
const SCHEMAS_COLUMN: &str = "Schemas";
|
||||
const SCHEMAS_COLUMN: &str = "Database";
|
||||
const TABLES_COLUMN: &str = "Tables";
|
||||
const TABLE_TYPE_COLUMN: &str = "Table_type";
|
||||
const COLUMN_NAME_COLUMN: &str = "Column";
|
||||
const COLUMN_TYPE_COLUMN: &str = "Type";
|
||||
const COLUMN_KEY_COLUMN: &str = "Key";
|
||||
@@ -100,49 +109,144 @@ static SHOW_CREATE_TABLE_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
|
||||
|
||||
pub async fn show_databases(
|
||||
stmt: ShowDatabases,
|
||||
catalog_manager: CatalogManagerRef,
|
||||
query_engine: &QueryEngineRef,
|
||||
catalog_manager: &CatalogManagerRef,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<Output> {
|
||||
let mut databases = catalog_manager
|
||||
.schema_names(query_ctx.current_catalog())
|
||||
let projects = vec![(schemata::SCHEMA_NAME, SCHEMAS_COLUMN)];
|
||||
|
||||
let filters = vec![col(schemata::CATALOG_NAME).eq(lit(query_ctx.current_catalog()))];
|
||||
let like_field = Some(schemata::SCHEMA_NAME);
|
||||
let sort = vec![col(schemata::SCHEMA_NAME).sort(true, true)];
|
||||
|
||||
query_from_information_schema_table(
|
||||
query_engine,
|
||||
catalog_manager,
|
||||
query_ctx,
|
||||
SCHEMATA,
|
||||
projects,
|
||||
filters,
|
||||
like_field,
|
||||
sort,
|
||||
stmt.kind,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Cast a `show` statement execution into a query from tables in `information_schema`.
|
||||
/// - `table_name`: the table name in `information_schema`,
|
||||
/// - `projects`: query projection, a list of `(column, renamed_column)`,
|
||||
/// - `filters`: filter expressions for query,
|
||||
/// - `like_field`: the field to filter by the predicate `ShowKind::Like`,
|
||||
/// - `sort`: sort the results by the specified sorting expressions,
|
||||
/// - `kind`: the show kind
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn query_from_information_schema_table(
|
||||
query_engine: &QueryEngineRef,
|
||||
catalog_manager: &CatalogManagerRef,
|
||||
query_ctx: QueryContextRef,
|
||||
table_name: &str,
|
||||
projects: Vec<(&str, &str)>,
|
||||
filters: Vec<Expr>,
|
||||
like_field: Option<&str>,
|
||||
sort: Vec<Expr>,
|
||||
kind: ShowKind,
|
||||
) -> Result<Output> {
|
||||
let table = catalog_manager
|
||||
.table(
|
||||
query_ctx.current_catalog(),
|
||||
INFORMATION_SCHEMA_NAME,
|
||||
table_name,
|
||||
)
|
||||
.await
|
||||
.context(error::CatalogSnafu)?;
|
||||
.context(error::CatalogSnafu)?
|
||||
.with_context(|| error::TableNotFoundSnafu {
|
||||
table: format_full_table_name(
|
||||
query_ctx.current_catalog(),
|
||||
INFORMATION_SCHEMA_NAME,
|
||||
table_name,
|
||||
),
|
||||
})?;
|
||||
|
||||
// TODO(dennis): Specify the order of the results in catalog manager API
|
||||
databases.sort();
|
||||
let DataFrame::DataFusion(dataframe) = query_engine.read_table(table)?;
|
||||
|
||||
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
|
||||
SCHEMAS_COLUMN,
|
||||
ConcreteDataType::string_datatype(),
|
||||
false,
|
||||
)]));
|
||||
match stmt.kind {
|
||||
ShowKind::All => {
|
||||
let databases = Arc::new(StringVector::from(databases)) as _;
|
||||
let records = RecordBatches::try_from_columns(schema, vec![databases])
|
||||
.context(error::CreateRecordBatchSnafu)?;
|
||||
Ok(Output::RecordBatches(records))
|
||||
// Apply filters
|
||||
let dataframe = filters.into_iter().try_fold(dataframe, |df, expr| {
|
||||
df.filter(expr).context(error::PlanSqlSnafu)
|
||||
})?;
|
||||
|
||||
// Apply `like` predicate if exists
|
||||
let dataframe = if let (ShowKind::Like(ident), Some(field)) = (&kind, like_field) {
|
||||
dataframe
|
||||
.filter(col(field).like(lit(ident.value.clone())))
|
||||
.context(error::PlanSqlSnafu)?
|
||||
} else {
|
||||
dataframe
|
||||
};
|
||||
|
||||
// Apply sorting
|
||||
let dataframe = dataframe
|
||||
.sort(sort)
|
||||
.context(error::PlanSqlSnafu)?
|
||||
.select_columns(&projects.iter().map(|(c, _)| *c).collect::<Vec<_>>())
|
||||
.context(error::PlanSqlSnafu)?;
|
||||
|
||||
// Apply projection
|
||||
let dataframe = projects
|
||||
.into_iter()
|
||||
.try_fold(dataframe, |df, (column, renamed_column)| {
|
||||
df.with_column_renamed(column, renamed_column)
|
||||
.context(error::PlanSqlSnafu)
|
||||
})?;
|
||||
|
||||
let dataframe = match kind {
|
||||
ShowKind::All | ShowKind::Like(_) => {
|
||||
// Like kind is processed above
|
||||
dataframe
|
||||
}
|
||||
ShowKind::Where(filter) => {
|
||||
let columns = vec![Arc::new(StringVector::from(databases)) as _];
|
||||
let record_batch =
|
||||
RecordBatch::new(schema, columns).context(error::CreateRecordBatchSnafu)?;
|
||||
let result = execute_show_with_filter(record_batch, Some(filter)).await?;
|
||||
Ok(result)
|
||||
// Cast the results into VIEW for `where` clause,
|
||||
// which is evaluated against the column names displayed by the SHOW statement.
|
||||
let view = dataframe.into_view();
|
||||
let dataframe = SessionContext::new_with_state(
|
||||
query_engine
|
||||
.engine_context(query_ctx.clone())
|
||||
.state()
|
||||
.clone(),
|
||||
)
|
||||
.read_table(view)
|
||||
.context(error::DataFusionSnafu)?;
|
||||
|
||||
let planner = query_engine.planner();
|
||||
let planner = planner
|
||||
.as_any()
|
||||
.downcast_ref::<DfLogicalPlanner>()
|
||||
.expect("Must be the datafusion planner");
|
||||
|
||||
let filter = planner
|
||||
.sql_to_expr(filter, dataframe.schema(), false, query_ctx)
|
||||
.await?;
|
||||
|
||||
// Apply the `where` clause filters
|
||||
dataframe.filter(filter).context(error::PlanSqlSnafu)?
|
||||
}
|
||||
ShowKind::Like(ident) => {
|
||||
let databases = Helper::like_utf8(databases, &ident.value)
|
||||
.context(error::VectorComputationSnafu)?;
|
||||
let records = RecordBatches::try_from_columns(schema, vec![databases])
|
||||
.context(error::CreateRecordBatchSnafu)?;
|
||||
Ok(Output::RecordBatches(records))
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let stream = dataframe
|
||||
.execute_stream()
|
||||
.await
|
||||
.context(error::DataFusionSnafu)?;
|
||||
|
||||
Ok(Output::Stream(
|
||||
Box::pin(RecordBatchStreamAdapter::try_new(stream).context(error::CreateRecordBatchSnafu)?),
|
||||
None,
|
||||
))
|
||||
}
|
||||
|
||||
pub async fn show_tables(
|
||||
stmt: ShowTables,
|
||||
catalog_manager: CatalogManagerRef,
|
||||
query_engine: &QueryEngineRef,
|
||||
catalog_manager: &CatalogManagerRef,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<Output> {
|
||||
let schema_name = if let Some(database) = stmt.database {
|
||||
@@ -150,85 +254,36 @@ pub async fn show_tables(
|
||||
} else {
|
||||
query_ctx.current_schema().to_owned()
|
||||
};
|
||||
// TODO(sunng87): move this function into query_ctx
|
||||
let mut tables = catalog_manager
|
||||
.table_names(query_ctx.current_catalog(), &schema_name)
|
||||
.await
|
||||
.context(error::CatalogSnafu)?;
|
||||
|
||||
// TODO(dennis): Specify the order of the results in schema provider API
|
||||
tables.sort();
|
||||
|
||||
let table_types: Option<Arc<dyn Vector>> = {
|
||||
if stmt.full {
|
||||
Some(
|
||||
get_table_types(
|
||||
&tables,
|
||||
catalog_manager.clone(),
|
||||
query_ctx.clone(),
|
||||
&schema_name,
|
||||
)
|
||||
.await?,
|
||||
)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
// (dennis): MySQL rename `table_name` to `Tables_in_{schema}`, but we use `Tables` instead.
|
||||
// I don't want to modify this currently, our dashboard may depend on it.
|
||||
let projects = if stmt.full {
|
||||
vec![
|
||||
(tables::TABLE_NAME, TABLES_COLUMN),
|
||||
(tables::TABLE_TYPE, TABLE_TYPE_COLUMN),
|
||||
]
|
||||
} else {
|
||||
vec![(tables::TABLE_NAME, TABLES_COLUMN)]
|
||||
};
|
||||
let filters = vec![
|
||||
col(tables::TABLE_SCHEMA).eq(lit(schema_name.clone())),
|
||||
col(tables::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
|
||||
];
|
||||
let like_field = Some(tables::TABLE_NAME);
|
||||
let sort = vec![col(tables::TABLE_NAME).sort(true, true)];
|
||||
|
||||
let mut column_schema = vec![ColumnSchema::new(
|
||||
TABLES_COLUMN,
|
||||
ConcreteDataType::string_datatype(),
|
||||
false,
|
||||
)];
|
||||
if table_types.is_some() {
|
||||
column_schema.push(ColumnSchema::new(
|
||||
"Table_type",
|
||||
ConcreteDataType::string_datatype(),
|
||||
false,
|
||||
));
|
||||
}
|
||||
|
||||
let schema = Arc::new(Schema::new(column_schema));
|
||||
|
||||
match stmt.kind {
|
||||
ShowKind::All => {
|
||||
let tables = Arc::new(StringVector::from(tables)) as _;
|
||||
let mut columns = vec![tables];
|
||||
if let Some(table_types) = table_types {
|
||||
columns.push(table_types)
|
||||
}
|
||||
|
||||
let records = RecordBatches::try_from_columns(schema, columns)
|
||||
.context(error::CreateRecordBatchSnafu)?;
|
||||
Ok(Output::RecordBatches(records))
|
||||
}
|
||||
ShowKind::Where(filter) => {
|
||||
let mut columns = vec![Arc::new(StringVector::from(tables)) as _];
|
||||
if let Some(table_types) = table_types {
|
||||
columns.push(table_types)
|
||||
}
|
||||
let record_batch =
|
||||
RecordBatch::new(schema, columns).context(error::CreateRecordBatchSnafu)?;
|
||||
let result = execute_show_with_filter(record_batch, Some(filter)).await?;
|
||||
Ok(result)
|
||||
}
|
||||
ShowKind::Like(ident) => {
|
||||
let (tables, filter) = Helper::like_utf8_filter(tables, &ident.value)
|
||||
.context(error::VectorComputationSnafu)?;
|
||||
let mut columns = vec![tables];
|
||||
|
||||
if let Some(table_types) = table_types {
|
||||
let table_types = table_types
|
||||
.filter(&filter)
|
||||
.context(error::VectorComputationSnafu)?;
|
||||
columns.push(table_types)
|
||||
}
|
||||
|
||||
let records = RecordBatches::try_from_columns(schema, columns)
|
||||
.context(error::CreateRecordBatchSnafu)?;
|
||||
Ok(Output::RecordBatches(records))
|
||||
}
|
||||
}
|
||||
query_from_information_schema_table(
|
||||
query_engine,
|
||||
catalog_manager,
|
||||
query_ctx,
|
||||
TABLES,
|
||||
projects,
|
||||
filters,
|
||||
like_field,
|
||||
sort,
|
||||
stmt.kind,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub fn show_variable(stmt: ShowVariables, query_ctx: QueryContextRef) -> Result<Output> {
|
||||
@@ -513,25 +568,6 @@ fn parse_file_table_format(options: &HashMap<String, String>) -> Result<Box<dyn
|
||||
)
|
||||
}
|
||||
|
||||
async fn get_table_types(
|
||||
tables: &[String],
|
||||
catalog_manager: CatalogManagerRef,
|
||||
query_ctx: QueryContextRef,
|
||||
schema_name: &str,
|
||||
) -> Result<Arc<dyn Vector>> {
|
||||
let mut table_types = Vec::with_capacity(tables.len());
|
||||
for table_name in tables {
|
||||
if let Some(table) = catalog_manager
|
||||
.table(query_ctx.current_catalog(), schema_name, table_name)
|
||||
.await
|
||||
.context(error::CatalogSnafu)?
|
||||
{
|
||||
table_types.push(table.table_type().to_string());
|
||||
}
|
||||
}
|
||||
Ok(Arc::new(StringVector::from(table_types)) as _)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -440,39 +440,27 @@ async fn test_execute_query(instance: Arc<dyn MockInstance>) {
|
||||
async fn test_execute_show_databases_tables(instance: Arc<dyn MockInstance>) {
|
||||
let instance = instance.frontend();
|
||||
|
||||
let expected = "\
|
||||
+--------------------+
|
||||
| Database |
|
||||
+--------------------+
|
||||
| greptime_private |
|
||||
| information_schema |
|
||||
| public |
|
||||
+--------------------+\
|
||||
";
|
||||
let output = execute_sql(&instance, "show databases").await;
|
||||
match output {
|
||||
Output::RecordBatches(databases) => {
|
||||
let databases = databases.take();
|
||||
assert_eq!(1, databases[0].num_columns());
|
||||
assert_eq!(databases[0].column(0).len(), 3);
|
||||
|
||||
assert_eq!(
|
||||
*databases[0].column(0),
|
||||
Arc::new(StringVector::from(vec![
|
||||
Some("greptime_private"),
|
||||
Some("information_schema"),
|
||||
Some("public")
|
||||
])) as VectorRef
|
||||
);
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
check_unordered_output_stream(output, expected).await;
|
||||
|
||||
let output = execute_sql(&instance, "show databases like '%bl%'").await;
|
||||
match output {
|
||||
Output::RecordBatches(databases) => {
|
||||
let databases = databases.take();
|
||||
assert_eq!(1, databases[0].num_columns());
|
||||
assert_eq!(databases[0].column(0).len(), 1);
|
||||
|
||||
assert_eq!(
|
||||
*databases[0].column(0),
|
||||
Arc::new(StringVector::from(vec![Some("public")])) as VectorRef
|
||||
);
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
let expected = "\
|
||||
+----------+
|
||||
| Database |
|
||||
+----------+
|
||||
| public |
|
||||
+----------+\
|
||||
";
|
||||
check_unordered_output_stream(output, expected).await;
|
||||
|
||||
let expected = "\
|
||||
+---------+
|
||||
@@ -500,21 +488,41 @@ async fn test_execute_show_databases_tables(instance: Arc<dyn MockInstance>) {
|
||||
";
|
||||
check_unordered_output_stream(output, expected).await;
|
||||
|
||||
let output = execute_sql(&instance, "SHOW FULL TABLES WHERE Table_type != 'VIEW'").await;
|
||||
let expected = "\
|
||||
+---------+-----------------+
|
||||
| Tables | Table_type |
|
||||
+---------+-----------------+
|
||||
| demo | BASE TABLE |
|
||||
| numbers | LOCAL TEMPORARY |
|
||||
+---------+-----------------+\
|
||||
";
|
||||
check_unordered_output_stream(output, expected).await;
|
||||
|
||||
let output = execute_sql(
|
||||
&instance,
|
||||
"SHOW FULL TABLES WHERE Table_type = 'BASE TABLE'",
|
||||
)
|
||||
.await;
|
||||
let expected = "\
|
||||
+--------+------------+
|
||||
| Tables | Table_type |
|
||||
+--------+------------+
|
||||
| demo | BASE TABLE |
|
||||
+--------+------------+\
|
||||
";
|
||||
check_unordered_output_stream(output, expected).await;
|
||||
|
||||
// show tables like [string]
|
||||
let output = execute_sql(&instance, "show tables like 'de%'").await;
|
||||
match output {
|
||||
Output::RecordBatches(databases) => {
|
||||
let databases = databases.take();
|
||||
assert_eq!(1, databases[0].num_columns());
|
||||
assert_eq!(databases[0].column(0).len(), 1);
|
||||
|
||||
assert_eq!(
|
||||
*databases[0].column(0),
|
||||
Arc::new(StringVector::from(vec![Some("demo")])) as VectorRef
|
||||
);
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
let expected = "\
|
||||
+--------+
|
||||
| Tables |
|
||||
+--------+
|
||||
| demo |
|
||||
+--------+\
|
||||
";
|
||||
check_unordered_output_stream(output, expected).await;
|
||||
}
|
||||
|
||||
#[apply(both_instances_cases)]
|
||||
|
||||
@@ -336,7 +336,12 @@ pub(crate) async fn check_unordered_output_stream(output: Output, expected: &str
|
||||
};
|
||||
let pretty_print = sort_table(&recordbatches.pretty_print().unwrap());
|
||||
let expected = sort_table(expected);
|
||||
assert_eq!(pretty_print, expected);
|
||||
assert_eq!(
|
||||
pretty_print,
|
||||
expected,
|
||||
"\n{}",
|
||||
recordbatches.pretty_print().unwrap()
|
||||
);
|
||||
}
|
||||
|
||||
pub fn prepare_path(p: &str) -> String {
|
||||
|
||||
@@ -13,16 +13,16 @@ Affected Rows: 1
|
||||
SHOW DATABASES LIKE '%public%';
|
||||
|
||||
+--------------------+
|
||||
| Schemas |
|
||||
| Database |
|
||||
+--------------------+
|
||||
| public |
|
||||
| test_public_schema |
|
||||
+--------------------+
|
||||
|
||||
SHOW DATABASES WHERE Schemas='test_public_schema';
|
||||
SHOW DATABASES WHERE Database = 'test_public_schema';
|
||||
|
||||
+--------------------+
|
||||
| Schemas |
|
||||
| Database |
|
||||
+--------------------+
|
||||
| test_public_schema |
|
||||
+--------------------+
|
||||
@@ -81,6 +81,14 @@ SHOW TABLES;
|
||||
| hello |
|
||||
+--------+
|
||||
|
||||
SHOW FULL TABLES WHERE Table_type != 'VIEW';
|
||||
|
||||
+--------+------------+
|
||||
| Tables | Table_type |
|
||||
+--------+------------+
|
||||
| hello | BASE TABLE |
|
||||
+--------+------------+
|
||||
|
||||
DROP TABLE hello;
|
||||
|
||||
Affected Rows: 0
|
||||
@@ -91,10 +99,8 @@ Error: 4001(TableNotFound), Table not found: greptime.test_public_schema.hello
|
||||
|
||||
SHOW TABLES FROM test_public_schema;
|
||||
|
||||
+--------+
|
||||
| Tables |
|
||||
+--------+
|
||||
+--------+
|
||||
++
|
||||
++
|
||||
|
||||
SHOW TABLES FROM public;
|
||||
|
||||
@@ -104,7 +110,7 @@ SHOW TABLES FROM public;
|
||||
| numbers |
|
||||
+---------+
|
||||
|
||||
SHOW TABLES FROM public WHERE Tables='numbers';
|
||||
SHOW TABLES FROM public WHERE Tables = 'numbers';
|
||||
|
||||
+---------+
|
||||
| Tables |
|
||||
|
||||
@@ -6,7 +6,7 @@ CREATE SCHEMA IF NOT EXISTS test_public_schema;
|
||||
|
||||
SHOW DATABASES LIKE '%public%';
|
||||
|
||||
SHOW DATABASES WHERE Schemas='test_public_schema';
|
||||
SHOW DATABASES WHERE Database = 'test_public_schema';
|
||||
|
||||
USE test_public_schema;
|
||||
|
||||
@@ -26,6 +26,8 @@ SELECT * FROM hello;
|
||||
|
||||
SHOW TABLES;
|
||||
|
||||
SHOW FULL TABLES WHERE Table_type != 'VIEW';
|
||||
|
||||
DROP TABLE hello;
|
||||
|
||||
DROP TABLE hello;
|
||||
@@ -34,7 +36,7 @@ SHOW TABLES FROM test_public_schema;
|
||||
|
||||
SHOW TABLES FROM public;
|
||||
|
||||
SHOW TABLES FROM public WHERE Tables='numbers';
|
||||
SHOW TABLES FROM public WHERE Tables = 'numbers';
|
||||
|
||||
DROP SCHEMA test_public_schema;
|
||||
|
||||
|
||||
@@ -13,7 +13,7 @@ Error: 1002(Unexpected), Unexpected, violated: Invalid database name: ㊙️data
|
||||
show databases;
|
||||
|
||||
+--------------------+
|
||||
| Schemas |
|
||||
| Database |
|
||||
+--------------------+
|
||||
| greptime_private |
|
||||
| illegal-database |
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
show databases;
|
||||
|
||||
+-----------------------+
|
||||
| Schemas |
|
||||
| Database |
|
||||
+-----------------------+
|
||||
| greptime_private |
|
||||
| illegal-database |
|
||||
|
||||
Reference in New Issue
Block a user