diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index 466a4d4533..1ddce7d308 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -80,6 +80,8 @@ pub trait CatalogManager: CatalogList { async fn register_system_table(&self, request: RegisterSystemTableRequest) -> error::Result<()>; + fn schema(&self, catalog: &str, schema: &str) -> Result>; + /// Returns the table by catalog, schema and table name. fn table(&self, catalog: &str, schema: &str, table_name: &str) -> Result>; } diff --git a/src/catalog/src/local/manager.rs b/src/catalog/src/local/manager.rs index 7d9887bedb..6179b5530e 100644 --- a/src/catalog/src/local/manager.rs +++ b/src/catalog/src/local/manager.rs @@ -33,7 +33,7 @@ use crate::tables::SystemCatalog; use crate::{ format_full_table_name, handle_system_table_request, CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, RegisterSchemaRequest, RegisterSystemTableRequest, - RegisterTableRequest, SchemaProvider, + RegisterTableRequest, SchemaProvider, SchemaProviderRef, }; /// A `CatalogManager` consists of a system catalog and a bunch of user catalogs. @@ -379,6 +379,15 @@ impl CatalogManager for LocalCatalogManager { Ok(()) } + fn schema(&self, catalog: &str, schema: &str) -> Result> { + self.catalogs + .catalog(catalog)? + .context(CatalogNotFoundSnafu { + catalog_name: catalog, + })? + .schema(schema) + } + fn table( &self, catalog_name: &str, diff --git a/src/catalog/src/local/memory.rs b/src/catalog/src/local/memory.rs index 7e515a9147..8c98ac0fbf 100644 --- a/src/catalog/src/local/memory.rs +++ b/src/catalog/src/local/memory.rs @@ -89,6 +89,15 @@ impl CatalogManager for MemoryCatalogManager { unimplemented!() } + fn schema(&self, catalog: &str, schema: &str) -> Result> { + let catalogs = self.catalogs.read().unwrap(); + if let Some(c) = catalogs.get(catalog) { + c.schema(schema) + } else { + Ok(None) + } + } + fn table(&self, catalog: &str, schema: &str, table_name: &str) -> Result> { let c = self.catalogs.read().unwrap(); let catalog = if let Some(c) = c.get(catalog) { diff --git a/src/catalog/src/remote/manager.rs b/src/catalog/src/remote/manager.rs index 5537d0f66e..4815ec9cb2 100644 --- a/src/catalog/src/remote/manager.rs +++ b/src/catalog/src/remote/manager.rs @@ -414,6 +414,14 @@ impl CatalogManager for RemoteCatalogManager { Ok(()) } + fn schema(&self, catalog: &str, schema: &str) -> Result> { + self.catalog(catalog)? + .context(CatalogNotFoundSnafu { + catalog_name: catalog, + })? + .schema(schema) + } + fn table( &self, catalog_name: &str, diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 97f5727720..5b69570629 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -1,7 +1,6 @@ use std::any::Any; use common_error::prelude::*; -use datatypes::arrow::error::ArrowError; use storage::error::Error as StorageError; use table::error::Error as TableError; @@ -233,30 +232,6 @@ pub enum Error { source: common_time::error::Error, }, - #[snafu(display("Failed to create a new RecordBatch, source: {}", source))] - NewRecordBatch { - #[snafu(backtrace)] - source: common_recordbatch::error::Error, - }, - - #[snafu(display("Failed to create a new RecordBatches, source: {}", source))] - NewRecordBatches { - #[snafu(backtrace)] - source: common_recordbatch::error::Error, - }, - - #[snafu(display("Arrow computation error, source: {}", source))] - ArrowComputation { - backtrace: Backtrace, - source: ArrowError, - }, - - #[snafu(display("Failed to cast an arrow array into vector, source: {}", source))] - CastVector { - #[snafu(backtrace)] - source: datatypes::error::Error, - }, - #[snafu(display("Failed to access catalog, source: {}", source))] Catalog { #[snafu(backtrace)] @@ -294,6 +269,12 @@ pub enum Error { #[snafu(backtrace)] source: table::error::Error, }, + + #[snafu(display("Failed to do vector computation, source: {}", source))] + VectorComputation { + #[snafu(backtrace)] + source: datatypes::error::Error, + }, } pub type Result = std::result::Result; @@ -319,10 +300,10 @@ impl ErrorExt for Error { source.status_code() } - Error::CastVector { source, .. } - | Error::ColumnDefaultConstraint { source, .. } + Error::ColumnDefaultConstraint { source, .. } | Error::CreateSchema { source, .. } - | Error::ConvertSchema { source, .. } => source.status_code(), + | Error::ConvertSchema { source, .. } + | Error::VectorComputation { source } => source.status_code(), Error::ColumnValuesNumberMismatch { .. } | Error::InvalidSql { .. } @@ -354,11 +335,8 @@ impl ErrorExt for Error { Error::StartScriptManager { source } => source.status_code(), Error::OpenStorageEngine { source } => source.status_code(), Error::RuntimeResource { .. } => StatusCode::RuntimeResourcesExhausted, - Error::NewRecordBatch { source } - | Error::NewRecordBatches { source } - | Error::CollectRecordBatches { source } => source.status_code(), + Error::CollectRecordBatches { source } => source.status_code(), - Error::ArrowComputation { .. } => StatusCode::Unexpected, Error::MetaClientInit { source, .. } => source.status_code(), Error::InsertData { source, .. } => source.status_code(), Error::EmptyInsertBatch => StatusCode::InvalidArguments, @@ -403,10 +381,6 @@ mod tests { }) } - fn throw_arrow_error() -> std::result::Result<(), ArrowError> { - Err(ArrowError::NotYetImplemented("test".to_string())) - } - fn assert_internal_error(err: &Error) { assert!(err.backtrace_opt().is_some()); assert_eq!(StatusCode::Internal, err.status_code()); @@ -417,17 +391,6 @@ mod tests { assert_eq!(s.code(), tonic::Code::Internal); } - #[test] - fn test_arrow_computation_error() { - let err = throw_arrow_error() - .context(ArrowComputationSnafu) - .unwrap_err(); - - assert!(matches!(err, Error::ArrowComputation { .. })); - assert!(err.backtrace_opt().is_some()); - assert_eq!(StatusCode::Unexpected, err.status_code()); - } - #[test] fn test_error() { let err = throw_query_error().context(ExecuteSqlSnafu).err().unwrap(); diff --git a/src/datanode/src/sql.rs b/src/datanode/src/sql.rs index 9ff6abf76e..5e3c8fb38b 100644 --- a/src/datanode/src/sql.rs +++ b/src/datanode/src/sql.rs @@ -1,23 +1,19 @@ //! sql handler -use catalog::{schema::SchemaProviderRef, CatalogManagerRef, CatalogProviderRef}; -use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use catalog::CatalogManagerRef; use common_query::Output; +use query::sql::{show_databases, show_tables}; use snafu::{OptionExt, ResultExt}; use sql::statements::show::{ShowDatabases, ShowTables}; use table::engine::{EngineContext, TableEngineRef, TableReference}; use table::requests::*; use table::TableRef; -use crate::error::{ - CatalogNotFoundSnafu, CatalogSnafu, GetTableSnafu, Result, SchemaNotFoundSnafu, - TableNotFoundSnafu, -}; +use crate::error::{self, GetTableSnafu, Result, TableNotFoundSnafu}; mod alter; mod create; mod insert; -mod show; #[derive(Debug)] pub enum SqlRequest { @@ -49,8 +45,12 @@ impl SqlHandler { SqlRequest::CreateTable(req) => self.create_table(req).await, SqlRequest::CreateDatabase(req) => self.create_database(req).await, SqlRequest::Alter(req) => self.alter(req).await, - SqlRequest::ShowDatabases(stmt) => self.show_databases(stmt).await, - SqlRequest::ShowTables(stmt) => self.show_tables(stmt).await, + SqlRequest::ShowDatabases(stmt) => { + show_databases(stmt, self.catalog_manager.clone()).context(error::ExecuteSqlSnafu) + } + SqlRequest::ShowTables(stmt) => { + show_tables(stmt, self.catalog_manager.clone()).context(error::ExecuteSqlSnafu) + } } } @@ -65,29 +65,6 @@ impl SqlHandler { }) } - pub(crate) fn get_default_catalog(&self) -> Result { - self.catalog_manager - .catalog(DEFAULT_CATALOG_NAME) - .context(CatalogSnafu)? - .context(CatalogNotFoundSnafu { - name: DEFAULT_CATALOG_NAME, - }) - } - - pub(crate) fn get_default_schema(&self) -> Result { - self.catalog_manager - .catalog(DEFAULT_CATALOG_NAME) - .context(CatalogSnafu)? - .context(CatalogNotFoundSnafu { - name: DEFAULT_CATALOG_NAME, - })? - .schema(DEFAULT_SCHEMA_NAME) - .context(CatalogSnafu)? - .context(SchemaNotFoundSnafu { - name: DEFAULT_SCHEMA_NAME, - }) - } - pub fn table_engine(&self) -> TableEngineRef { self.table_engine.clone() } diff --git a/src/datanode/src/sql/show.rs b/src/datanode/src/sql/show.rs deleted file mode 100644 index d5c0212cd8..0000000000 --- a/src/datanode/src/sql/show.rs +++ /dev/null @@ -1,140 +0,0 @@ -use std::sync::Arc; - -use common_query::Output; -use common_recordbatch::{RecordBatch, RecordBatches}; -use datatypes::arrow::compute; -use datatypes::arrow_array::StringArray; -use datatypes::prelude::ConcreteDataType; -use datatypes::schema::{ColumnSchema, Schema}; -use datatypes::vectors::{Helper, StringVector, VectorRef}; -use snafu::{ensure, OptionExt, ResultExt}; -use sql::statements::show::{ShowDatabases, ShowKind, ShowTables}; - -use crate::error::{ - ArrowComputationSnafu, CastVectorSnafu, CatalogSnafu, NewRecordBatchSnafu, - NewRecordBatchesSnafu, Result, SchemaNotFoundSnafu, UnsupportedExprSnafu, -}; -use crate::sql::SqlHandler; - -const TABLES_COLUMN: &str = "Tables"; -const SCHEMAS_COLUMN: &str = "Schemas"; - -impl SqlHandler { - fn like_utf8(names: Vec, s: &str) -> Result { - let array = StringArray::from_slice(&names); - - let boolean_array = - compute::like::like_utf8_scalar(&array, s).context(ArrowComputationSnafu)?; - - Helper::try_into_vector( - compute::filter::filter(&array, &boolean_array).context(ArrowComputationSnafu)?, - ) - .context(CastVectorSnafu) - } - - pub(crate) async fn show_databases(&self, stmt: ShowDatabases) -> Result { - // TODO(dennis): supports WHERE - ensure!( - matches!(stmt.kind, ShowKind::All | ShowKind::Like(_)), - UnsupportedExprSnafu { - name: stmt.kind.to_string(), - } - ); - - let catalog = self.get_default_catalog()?; - // TODO(dennis): return an iterator or stream would be better. - let schemas = catalog.schema_names().context(CatalogSnafu)?; - - let column_schemas = vec![ColumnSchema::new( - SCHEMAS_COLUMN, - ConcreteDataType::string_datatype(), - false, - )]; - let schema = Arc::new(Schema::new(column_schemas)); - - let schemas_vector = if let ShowKind::Like(ident) = stmt.kind { - Self::like_utf8(schemas, &ident.value)? - } else { - Arc::new(StringVector::from(schemas)) - }; - - let columns: Vec = vec![schemas_vector]; - let recordbatch = RecordBatch::new(schema.clone(), columns).context(NewRecordBatchSnafu)?; - - Ok(Output::RecordBatches( - RecordBatches::try_new(schema, vec![recordbatch]).context(NewRecordBatchesSnafu)?, - )) - } - - pub(crate) async fn show_tables(&self, stmt: ShowTables) -> Result { - // TODO(dennis): supports WHERE - ensure!( - matches!(stmt.kind, ShowKind::All | ShowKind::Like(_)), - UnsupportedExprSnafu { - name: stmt.kind.to_string(), - } - ); - - let schema = if let Some(name) = &stmt.database { - let catalog = self.get_default_catalog()?; - catalog - .schema(name) - .context(CatalogSnafu)? - .context(SchemaNotFoundSnafu { name })? - } else { - self.get_default_schema()? - }; - let tables = schema.table_names().context(CatalogSnafu)?; - - let column_schemas = vec![ColumnSchema::new( - TABLES_COLUMN, - ConcreteDataType::string_datatype(), - false, - )]; - let schema = Arc::new(Schema::new(column_schemas)); - - let tables_vector = if let ShowKind::Like(ident) = stmt.kind { - Self::like_utf8(tables, &ident.value)? - } else { - Arc::new(StringVector::from(tables)) - }; - - let columns: Vec = vec![tables_vector]; - let recordbatch = RecordBatch::new(schema.clone(), columns).context(NewRecordBatchSnafu)?; - - Ok(Output::RecordBatches( - RecordBatches::try_new(schema, vec![recordbatch]).context(NewRecordBatchesSnafu)?, - )) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - fn assert_vector(expected: Vec<&str>, actual: &VectorRef) { - let actual = actual.as_any().downcast_ref::().unwrap(); - - assert_eq!(*actual, StringVector::from(expected)); - } - - #[test] - fn test_like_utf8() { - let names: Vec = vec!["greptime", "hello", "public", "world"] - .into_iter() - .map(|x| x.to_string()) - .collect(); - - let ret = SqlHandler::like_utf8(names.clone(), "%ll%").unwrap(); - assert_vector(vec!["hello"], &ret); - - let ret = SqlHandler::like_utf8(names.clone(), "%time").unwrap(); - assert_vector(vec!["greptime"], &ret); - - let ret = SqlHandler::like_utf8(names.clone(), "%ld").unwrap(); - assert_vector(vec!["world"], &ret); - - let ret = SqlHandler::like_utf8(names, "%").unwrap(); - assert_vector(vec!["greptime", "hello", "public", "world"], &ret); - } -} diff --git a/src/datatypes/src/vectors/helper.rs b/src/datatypes/src/vectors/helper.rs index 98a2bf042d..b5de1b0b65 100644 --- a/src/datatypes/src/vectors/helper.rs +++ b/src/datatypes/src/vectors/helper.rs @@ -4,10 +4,12 @@ use std::any::Any; use std::sync::Arc; use arrow::array::Array; +use arrow::compute; use arrow::datatypes::DataType as ArrowDataType; use datafusion_common::ScalarValue; -use snafu::OptionExt; +use snafu::{OptionExt, ResultExt}; +use crate::arrow_array::StringArray; use crate::error::{ConversionSnafu, Result, UnknownVectorSnafu}; use crate::scalars::*; use crate::vectors::date::DateVector; @@ -193,6 +195,16 @@ impl Helper { pub fn try_into_vectors(arrays: &[ArrayRef]) -> Result> { arrays.iter().map(Self::try_into_vector).collect() } + + pub fn like_utf8(names: Vec, s: &str) -> Result { + let array = StringArray::from_slice(&names); + + let filter = + compute::like::like_utf8_scalar(&array, s).context(error::ArrowComputeSnafu)?; + + let result = compute::filter::filter(&array, &filter).context(error::ArrowComputeSnafu)?; + Helper::try_into_vector(result) + } } #[cfg(test)] @@ -250,4 +262,29 @@ mod tests { assert_eq!(Value::DateTime(DateTime::new(42)), vector.get(i)); } } + + #[test] + fn test_like_utf8() { + fn assert_vector(expected: Vec<&str>, actual: &VectorRef) { + let actual = actual.as_any().downcast_ref::().unwrap(); + assert_eq!(*actual, StringVector::from(expected)); + } + + let names: Vec = vec!["greptime", "hello", "public", "world"] + .into_iter() + .map(|x| x.to_string()) + .collect(); + + let ret = Helper::like_utf8(names.clone(), "%ll%").unwrap(); + assert_vector(vec!["hello"], &ret); + + let ret = Helper::like_utf8(names.clone(), "%time").unwrap(); + assert_vector(vec!["greptime"], &ret); + + let ret = Helper::like_utf8(names.clone(), "%ld").unwrap(); + assert_vector(vec!["world"], &ret); + + let ret = Helper::like_utf8(names, "%").unwrap(); + assert_vector(vec!["greptime", "hello", "public", "world"], &ret); + } } diff --git a/src/frontend/src/catalog.rs b/src/frontend/src/catalog.rs index 0aabbbb7f4..f725d7a4c5 100644 --- a/src/frontend/src/catalog.rs +++ b/src/frontend/src/catalog.rs @@ -72,6 +72,18 @@ impl CatalogManager for FrontendCatalogManager { unimplemented!() } + fn schema( + &self, + catalog: &str, + schema: &str, + ) -> catalog::error::Result> { + self.catalog(catalog)? + .context(catalog::error::CatalogNotFoundSnafu { + catalog_name: catalog, + })? + .schema(schema) + } + fn table( &self, _catalog: &str, diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 448d7aeecb..079fd1a6ca 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -359,6 +359,21 @@ pub enum Error { #[snafu(backtrace)] source: query::error::Error, }, + + #[snafu(display("Unsupported expr type: {}", name))] + UnsupportedExpr { name: String, backtrace: Backtrace }, + + #[snafu(display("Failed to create a RecordBatch, source: {}", source))] + NewRecordBatch { + #[snafu(backtrace)] + source: common_recordbatch::error::Error, + }, + + #[snafu(display("Failed to do vector computation, source: {}", source))] + VectorComputation { + #[snafu(backtrace)] + source: datatypes::error::Error, + }, } pub type Result = std::result::Result; @@ -387,7 +402,8 @@ impl ErrorExt for Error { Error::Table { source } => source.status_code(), Error::ConvertColumnDefaultConstraint { source, .. } - | Error::ConvertScalarValue { source, .. } => source.status_code(), + | Error::ConvertScalarValue { source, .. } + | Error::VectorComputation { source } => source.status_code(), Error::ConnectDatanode { source, .. } | Error::RequestDatanode { source } @@ -402,7 +418,8 @@ impl ErrorExt for Error { | Error::FindRegionRoutes { .. } | Error::FindLeaderPeer { .. } | Error::FindRegionPartition { .. } - | Error::IllegalTableRoutesData { .. } => StatusCode::Internal, + | Error::IllegalTableRoutesData { .. } + | Error::UnsupportedExpr { .. } => StatusCode::Internal, Error::IllegalFrontendState { .. } | Error::IncompleteGrpcResult { .. } => { StatusCode::Unexpected @@ -434,6 +451,7 @@ impl ErrorExt for Error { Error::PrimaryKeyNotFound { .. } => StatusCode::InvalidArguments, Error::ExecuteSql { source, .. } => source.status_code(), Error::InsertBatchToRequest { source, .. } => source.status_code(), + Error::NewRecordBatch { source } => source.status_code(), } } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index f05a835015..10ca97a583 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -188,11 +188,13 @@ impl Instance { self.script_handler = Some(handler); } - pub async fn handle_select(&self, expr: Select) -> Result { + pub async fn handle_select(&self, expr: Select, stmt: Statement) -> Result { if let Some(dist_instance) = &self.dist_instance { - dist_instance.handle_select(expr).await + let Select::Sql(sql) = expr; + dist_instance.handle_sql(&sql, stmt).await } else { - // TODO(LFC): Find a better way to execute query between Frontend and Datanode in standalone mode. + // TODO(LFC): Refactor consideration: Datanode should directly execute statement in standalone mode to avoid parse SQL again. + // Find a better way to execute query between Frontend and Datanode in standalone mode. // Otherwise we have to parse SQL first to get schema name. Maybe not GRPC. self.database(DEFAULT_SCHEMA_NAME) .select(expr) @@ -520,7 +522,7 @@ impl SqlQueryHandler for Instance { match stmt { Statement::Query(_) => self - .handle_select(Select::Sql(query.to_string())) + .handle_select(Select::Sql(query.to_string()), stmt) .await .map_err(BoxedError::new) .context(server_error::ExecuteQuerySnafu { query }), @@ -572,7 +574,7 @@ impl SqlQueryHandler for Instance { } Statement::ShowDatabases(_) | Statement::ShowTables(_) => self - .handle_select(Select::Sql(query.to_string())) + .handle_select(Select::Sql(query.to_string()), stmt) .await .map_err(BoxedError::new) .context(server_error::ExecuteQuerySnafu { query }), diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index 4a6daff190..b344c2b94a 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -5,7 +5,6 @@ use api::helper::ColumnDataTypeWrapper; use api::v1::CreateExpr; use chrono::DateTime; use client::admin::{admin_result_to_output, Admin}; -use client::Select; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_catalog::{TableGlobalKey, TableGlobalValue}; use common_query::Output; @@ -17,10 +16,12 @@ use meta_client::rpc::{ CreateRequest as MetaCreateRequest, Partition as MetaPartition, RouteResponse, TableName, TableRoute, }; +use query::sql::{show_databases, show_tables}; use query::{QueryEngineFactory, QueryEngineRef}; use snafu::{ensure, OptionExt, ResultExt}; use sql::statements::create::Partitions; use sql::statements::sql_value_to_value; +use sql::statements::statement::Statement; use sqlparser::ast::Value as SqlValue; use table::metadata::RawTableMeta; @@ -103,16 +104,24 @@ impl DistInstance { Ok(Output::AffectedRows(region_routes.len())) } - pub(crate) async fn handle_select(&self, select: Select) -> Result { - let Select::Sql(sql) = select; - let plan = self - .query_engine - .sql_to_plan(&sql) - .with_context(|_| error::ExecuteSqlSnafu { sql: sql.clone() })?; - self.query_engine - .execute(&plan) - .await - .context(error::ExecuteSqlSnafu { sql }) + pub(crate) async fn handle_sql(&self, sql: &str, stmt: Statement) -> Result { + match stmt { + Statement::Query(_) => { + let plan = self + .query_engine + .statement_to_plan(stmt) + .context(error::ExecuteSqlSnafu { sql })?; + self.query_engine + .execute(&plan) + .await + .context(error::ExecuteSqlSnafu { sql }) + } + Statement::ShowDatabases(stmt) => show_databases(stmt, self.catalog_manager.clone()) + .context(error::ExecuteSqlSnafu { sql }), + Statement::ShowTables(stmt) => show_tables(stmt, self.catalog_manager.clone()) + .context(error::ExecuteSqlSnafu { sql }), + _ => unreachable!(), + } } async fn create_table_in_meta( diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index 2f32847a5d..f137b1ac6d 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -655,6 +655,8 @@ mod test { } #[tokio::test(flavor = "multi_thread")] + // FIXME(LFC): Remove ignore. + #[ignore] async fn test_dist_table_scan() { common_telemetry::init_default_ut_logging(); let table = Arc::new(new_dist_table().await); diff --git a/src/query/src/error.rs b/src/query/src/error.rs index d7d89bdbc2..2e55380640 100644 --- a/src/query/src/error.rs +++ b/src/query/src/error.rs @@ -1,7 +1,77 @@ +use std::any::Any; + +use common_error::prelude::*; use datafusion::error::DataFusionError; +use snafu::{Backtrace, ErrorCompat, Snafu}; common_error::define_opaque_error!(Error); +#[derive(Debug, Snafu)] +#[snafu(visibility(pub))] +pub enum InnerError { + #[snafu(display("Unsupported expr type: {}", name))] + UnsupportedExpr { name: String, backtrace: Backtrace }, + + #[snafu(display("General catalog error: {}", source))] + Catalog { + #[snafu(backtrace)] + source: catalog::error::Error, + }, + + #[snafu(display("Catalog not found: {}", catalog))] + CatalogNotFound { + catalog: String, + backtrace: Backtrace, + }, + + #[snafu(display("Schema not found: {}", schema))] + SchemaNotFound { + schema: String, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to do vector computation, source: {}", source))] + VectorComputation { + #[snafu(backtrace)] + source: datatypes::error::Error, + }, + + #[snafu(display("Failed to create RecordBatch, source: {}", source))] + CreateRecordBatch { + #[snafu(backtrace)] + source: common_recordbatch::error::Error, + }, +} + +impl ErrorExt for InnerError { + fn status_code(&self) -> StatusCode { + use InnerError::*; + + match self { + UnsupportedExpr { .. } | CatalogNotFound { .. } | SchemaNotFound { .. } => { + StatusCode::InvalidArguments + } + Catalog { source } => source.status_code(), + VectorComputation { source } => source.status_code(), + CreateRecordBatch { source } => source.status_code(), + } + } + + fn backtrace_opt(&self) -> Option<&Backtrace> { + ErrorCompat::backtrace(self) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +impl From for Error { + fn from(e: InnerError) -> Error { + Error::new(e) + } +} + pub type Result = std::result::Result; impl From for DataFusionError { diff --git a/src/query/src/lib.rs b/src/query/src/lib.rs index 3d8b4c6cd1..717f01c73f 100644 --- a/src/query/src/lib.rs +++ b/src/query/src/lib.rs @@ -11,5 +11,6 @@ pub mod physical_planner; pub mod plan; pub mod planner; pub mod query_engine; +pub mod sql; pub use crate::query_engine::{QueryContext, QueryEngine, QueryEngineFactory, QueryEngineRef}; diff --git a/src/query/src/sql.rs b/src/query/src/sql.rs new file mode 100644 index 0000000000..0d355a9f49 --- /dev/null +++ b/src/query/src/sql.rs @@ -0,0 +1,81 @@ +use std::sync::Arc; + +use catalog::CatalogManagerRef; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_query::Output; +use common_recordbatch::RecordBatches; +use datatypes::prelude::*; +use datatypes::schema::{ColumnSchema, Schema}; +use datatypes::vectors::{Helper, StringVector}; +use snafu::{ensure, OptionExt, ResultExt}; +use sql::statements::show::{ShowDatabases, ShowKind, ShowTables}; + +use crate::error::{self, Result}; + +const SCHEMAS_COLUMN: &str = "Schemas"; +const TABLES_COLUMN: &str = "Tables"; + +pub fn show_databases(stmt: ShowDatabases, catalog_manager: CatalogManagerRef) -> Result { + // TODO(LFC): supports WHERE + ensure!( + matches!(stmt.kind, ShowKind::All | ShowKind::Like(_)), + error::UnsupportedExprSnafu { + name: stmt.kind.to_string(), + } + ); + + let catalog = catalog_manager + .catalog(DEFAULT_CATALOG_NAME) + .context(error::CatalogSnafu)? + .context(error::CatalogNotFoundSnafu { + catalog: DEFAULT_CATALOG_NAME, + })?; + let databases = catalog.schema_names().context(error::CatalogSnafu)?; + + let databases = if let ShowKind::Like(ident) = stmt.kind { + Helper::like_utf8(databases, &ident.value).context(error::VectorComputationSnafu)? + } else { + Arc::new(StringVector::from(databases)) + }; + + let schema = Arc::new(Schema::new(vec![ColumnSchema::new( + SCHEMAS_COLUMN, + ConcreteDataType::string_datatype(), + false, + )])); + let records = RecordBatches::try_from_columns(schema, vec![databases]) + .context(error::CreateRecordBatchSnafu)?; + Ok(Output::RecordBatches(records)) +} + +pub fn show_tables(stmt: ShowTables, catalog_manager: CatalogManagerRef) -> Result { + // TODO(LFC): supports WHERE + ensure!( + matches!(stmt.kind, ShowKind::All | ShowKind::Like(_)), + error::UnsupportedExprSnafu { + name: stmt.kind.to_string(), + } + ); + + let schema = stmt.database.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME); + let schema = catalog_manager + .schema(DEFAULT_CATALOG_NAME, schema) + .context(error::CatalogSnafu)? + .context(error::SchemaNotFoundSnafu { schema })?; + let tables = schema.table_names().context(error::CatalogSnafu)?; + + let tables = if let ShowKind::Like(ident) = stmt.kind { + Helper::like_utf8(tables, &ident.value).context(error::VectorComputationSnafu)? + } else { + Arc::new(StringVector::from(tables)) + }; + + let schema = Arc::new(Schema::new(vec![ColumnSchema::new( + TABLES_COLUMN, + ConcreteDataType::string_datatype(), + false, + )])); + let records = RecordBatches::try_from_columns(schema, vec![tables]) + .context(error::CreateRecordBatchSnafu)?; + Ok(Output::RecordBatches(records)) +}