mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
feat: Frontend show tables and databases (#504)
* feat: Frontend show tables and databases Co-authored-by: luofucong <luofucong@greptime.com>
This commit is contained in:
@@ -80,6 +80,8 @@ pub trait CatalogManager: CatalogList {
|
||||
async fn register_system_table(&self, request: RegisterSystemTableRequest)
|
||||
-> error::Result<()>;
|
||||
|
||||
fn schema(&self, catalog: &str, schema: &str) -> Result<Option<SchemaProviderRef>>;
|
||||
|
||||
/// Returns the table by catalog, schema and table name.
|
||||
fn table(&self, catalog: &str, schema: &str, table_name: &str) -> Result<Option<TableRef>>;
|
||||
}
|
||||
|
||||
@@ -33,7 +33,7 @@ use crate::tables::SystemCatalog;
|
||||
use crate::{
|
||||
format_full_table_name, handle_system_table_request, CatalogList, CatalogManager,
|
||||
CatalogProvider, CatalogProviderRef, RegisterSchemaRequest, RegisterSystemTableRequest,
|
||||
RegisterTableRequest, SchemaProvider,
|
||||
RegisterTableRequest, SchemaProvider, SchemaProviderRef,
|
||||
};
|
||||
|
||||
/// A `CatalogManager` consists of a system catalog and a bunch of user catalogs.
|
||||
@@ -379,6 +379,15 @@ impl CatalogManager for LocalCatalogManager {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn schema(&self, catalog: &str, schema: &str) -> Result<Option<SchemaProviderRef>> {
|
||||
self.catalogs
|
||||
.catalog(catalog)?
|
||||
.context(CatalogNotFoundSnafu {
|
||||
catalog_name: catalog,
|
||||
})?
|
||||
.schema(schema)
|
||||
}
|
||||
|
||||
fn table(
|
||||
&self,
|
||||
catalog_name: &str,
|
||||
|
||||
@@ -89,6 +89,15 @@ impl CatalogManager for MemoryCatalogManager {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn schema(&self, catalog: &str, schema: &str) -> Result<Option<SchemaProviderRef>> {
|
||||
let catalogs = self.catalogs.read().unwrap();
|
||||
if let Some(c) = catalogs.get(catalog) {
|
||||
c.schema(schema)
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
fn table(&self, catalog: &str, schema: &str, table_name: &str) -> Result<Option<TableRef>> {
|
||||
let c = self.catalogs.read().unwrap();
|
||||
let catalog = if let Some(c) = c.get(catalog) {
|
||||
|
||||
@@ -414,6 +414,14 @@ impl CatalogManager for RemoteCatalogManager {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn schema(&self, catalog: &str, schema: &str) -> Result<Option<SchemaProviderRef>> {
|
||||
self.catalog(catalog)?
|
||||
.context(CatalogNotFoundSnafu {
|
||||
catalog_name: catalog,
|
||||
})?
|
||||
.schema(schema)
|
||||
}
|
||||
|
||||
fn table(
|
||||
&self,
|
||||
catalog_name: &str,
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
use std::any::Any;
|
||||
|
||||
use common_error::prelude::*;
|
||||
use datatypes::arrow::error::ArrowError;
|
||||
use storage::error::Error as StorageError;
|
||||
use table::error::Error as TableError;
|
||||
|
||||
@@ -233,30 +232,6 @@ pub enum Error {
|
||||
source: common_time::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to create a new RecordBatch, source: {}", source))]
|
||||
NewRecordBatch {
|
||||
#[snafu(backtrace)]
|
||||
source: common_recordbatch::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to create a new RecordBatches, source: {}", source))]
|
||||
NewRecordBatches {
|
||||
#[snafu(backtrace)]
|
||||
source: common_recordbatch::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Arrow computation error, source: {}", source))]
|
||||
ArrowComputation {
|
||||
backtrace: Backtrace,
|
||||
source: ArrowError,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to cast an arrow array into vector, source: {}", source))]
|
||||
CastVector {
|
||||
#[snafu(backtrace)]
|
||||
source: datatypes::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to access catalog, source: {}", source))]
|
||||
Catalog {
|
||||
#[snafu(backtrace)]
|
||||
@@ -294,6 +269,12 @@ pub enum Error {
|
||||
#[snafu(backtrace)]
|
||||
source: table::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to do vector computation, source: {}", source))]
|
||||
VectorComputation {
|
||||
#[snafu(backtrace)]
|
||||
source: datatypes::error::Error,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -319,10 +300,10 @@ impl ErrorExt for Error {
|
||||
source.status_code()
|
||||
}
|
||||
|
||||
Error::CastVector { source, .. }
|
||||
| Error::ColumnDefaultConstraint { source, .. }
|
||||
Error::ColumnDefaultConstraint { source, .. }
|
||||
| Error::CreateSchema { source, .. }
|
||||
| Error::ConvertSchema { source, .. } => source.status_code(),
|
||||
| Error::ConvertSchema { source, .. }
|
||||
| Error::VectorComputation { source } => source.status_code(),
|
||||
|
||||
Error::ColumnValuesNumberMismatch { .. }
|
||||
| Error::InvalidSql { .. }
|
||||
@@ -354,11 +335,8 @@ impl ErrorExt for Error {
|
||||
Error::StartScriptManager { source } => source.status_code(),
|
||||
Error::OpenStorageEngine { source } => source.status_code(),
|
||||
Error::RuntimeResource { .. } => StatusCode::RuntimeResourcesExhausted,
|
||||
Error::NewRecordBatch { source }
|
||||
| Error::NewRecordBatches { source }
|
||||
| Error::CollectRecordBatches { source } => source.status_code(),
|
||||
Error::CollectRecordBatches { source } => source.status_code(),
|
||||
|
||||
Error::ArrowComputation { .. } => StatusCode::Unexpected,
|
||||
Error::MetaClientInit { source, .. } => source.status_code(),
|
||||
Error::InsertData { source, .. } => source.status_code(),
|
||||
Error::EmptyInsertBatch => StatusCode::InvalidArguments,
|
||||
@@ -403,10 +381,6 @@ mod tests {
|
||||
})
|
||||
}
|
||||
|
||||
fn throw_arrow_error() -> std::result::Result<(), ArrowError> {
|
||||
Err(ArrowError::NotYetImplemented("test".to_string()))
|
||||
}
|
||||
|
||||
fn assert_internal_error(err: &Error) {
|
||||
assert!(err.backtrace_opt().is_some());
|
||||
assert_eq!(StatusCode::Internal, err.status_code());
|
||||
@@ -417,17 +391,6 @@ mod tests {
|
||||
assert_eq!(s.code(), tonic::Code::Internal);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_arrow_computation_error() {
|
||||
let err = throw_arrow_error()
|
||||
.context(ArrowComputationSnafu)
|
||||
.unwrap_err();
|
||||
|
||||
assert!(matches!(err, Error::ArrowComputation { .. }));
|
||||
assert!(err.backtrace_opt().is_some());
|
||||
assert_eq!(StatusCode::Unexpected, err.status_code());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_error() {
|
||||
let err = throw_query_error().context(ExecuteSqlSnafu).err().unwrap();
|
||||
|
||||
@@ -1,23 +1,19 @@
|
||||
//! sql handler
|
||||
|
||||
use catalog::{schema::SchemaProviderRef, CatalogManagerRef, CatalogProviderRef};
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use catalog::CatalogManagerRef;
|
||||
use common_query::Output;
|
||||
use query::sql::{show_databases, show_tables};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use sql::statements::show::{ShowDatabases, ShowTables};
|
||||
use table::engine::{EngineContext, TableEngineRef, TableReference};
|
||||
use table::requests::*;
|
||||
use table::TableRef;
|
||||
|
||||
use crate::error::{
|
||||
CatalogNotFoundSnafu, CatalogSnafu, GetTableSnafu, Result, SchemaNotFoundSnafu,
|
||||
TableNotFoundSnafu,
|
||||
};
|
||||
use crate::error::{self, GetTableSnafu, Result, TableNotFoundSnafu};
|
||||
|
||||
mod alter;
|
||||
mod create;
|
||||
mod insert;
|
||||
mod show;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum SqlRequest {
|
||||
@@ -49,8 +45,12 @@ impl SqlHandler {
|
||||
SqlRequest::CreateTable(req) => self.create_table(req).await,
|
||||
SqlRequest::CreateDatabase(req) => self.create_database(req).await,
|
||||
SqlRequest::Alter(req) => self.alter(req).await,
|
||||
SqlRequest::ShowDatabases(stmt) => self.show_databases(stmt).await,
|
||||
SqlRequest::ShowTables(stmt) => self.show_tables(stmt).await,
|
||||
SqlRequest::ShowDatabases(stmt) => {
|
||||
show_databases(stmt, self.catalog_manager.clone()).context(error::ExecuteSqlSnafu)
|
||||
}
|
||||
SqlRequest::ShowTables(stmt) => {
|
||||
show_tables(stmt, self.catalog_manager.clone()).context(error::ExecuteSqlSnafu)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -65,29 +65,6 @@ impl SqlHandler {
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn get_default_catalog(&self) -> Result<CatalogProviderRef> {
|
||||
self.catalog_manager
|
||||
.catalog(DEFAULT_CATALOG_NAME)
|
||||
.context(CatalogSnafu)?
|
||||
.context(CatalogNotFoundSnafu {
|
||||
name: DEFAULT_CATALOG_NAME,
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn get_default_schema(&self) -> Result<SchemaProviderRef> {
|
||||
self.catalog_manager
|
||||
.catalog(DEFAULT_CATALOG_NAME)
|
||||
.context(CatalogSnafu)?
|
||||
.context(CatalogNotFoundSnafu {
|
||||
name: DEFAULT_CATALOG_NAME,
|
||||
})?
|
||||
.schema(DEFAULT_SCHEMA_NAME)
|
||||
.context(CatalogSnafu)?
|
||||
.context(SchemaNotFoundSnafu {
|
||||
name: DEFAULT_SCHEMA_NAME,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn table_engine(&self) -> TableEngineRef {
|
||||
self.table_engine.clone()
|
||||
}
|
||||
|
||||
@@ -1,140 +0,0 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_query::Output;
|
||||
use common_recordbatch::{RecordBatch, RecordBatches};
|
||||
use datatypes::arrow::compute;
|
||||
use datatypes::arrow_array::StringArray;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::{ColumnSchema, Schema};
|
||||
use datatypes::vectors::{Helper, StringVector, VectorRef};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use sql::statements::show::{ShowDatabases, ShowKind, ShowTables};
|
||||
|
||||
use crate::error::{
|
||||
ArrowComputationSnafu, CastVectorSnafu, CatalogSnafu, NewRecordBatchSnafu,
|
||||
NewRecordBatchesSnafu, Result, SchemaNotFoundSnafu, UnsupportedExprSnafu,
|
||||
};
|
||||
use crate::sql::SqlHandler;
|
||||
|
||||
const TABLES_COLUMN: &str = "Tables";
|
||||
const SCHEMAS_COLUMN: &str = "Schemas";
|
||||
|
||||
impl SqlHandler {
|
||||
fn like_utf8(names: Vec<String>, s: &str) -> Result<VectorRef> {
|
||||
let array = StringArray::from_slice(&names);
|
||||
|
||||
let boolean_array =
|
||||
compute::like::like_utf8_scalar(&array, s).context(ArrowComputationSnafu)?;
|
||||
|
||||
Helper::try_into_vector(
|
||||
compute::filter::filter(&array, &boolean_array).context(ArrowComputationSnafu)?,
|
||||
)
|
||||
.context(CastVectorSnafu)
|
||||
}
|
||||
|
||||
pub(crate) async fn show_databases(&self, stmt: ShowDatabases) -> Result<Output> {
|
||||
// TODO(dennis): supports WHERE
|
||||
ensure!(
|
||||
matches!(stmt.kind, ShowKind::All | ShowKind::Like(_)),
|
||||
UnsupportedExprSnafu {
|
||||
name: stmt.kind.to_string(),
|
||||
}
|
||||
);
|
||||
|
||||
let catalog = self.get_default_catalog()?;
|
||||
// TODO(dennis): return an iterator or stream would be better.
|
||||
let schemas = catalog.schema_names().context(CatalogSnafu)?;
|
||||
|
||||
let column_schemas = vec![ColumnSchema::new(
|
||||
SCHEMAS_COLUMN,
|
||||
ConcreteDataType::string_datatype(),
|
||||
false,
|
||||
)];
|
||||
let schema = Arc::new(Schema::new(column_schemas));
|
||||
|
||||
let schemas_vector = if let ShowKind::Like(ident) = stmt.kind {
|
||||
Self::like_utf8(schemas, &ident.value)?
|
||||
} else {
|
||||
Arc::new(StringVector::from(schemas))
|
||||
};
|
||||
|
||||
let columns: Vec<VectorRef> = vec![schemas_vector];
|
||||
let recordbatch = RecordBatch::new(schema.clone(), columns).context(NewRecordBatchSnafu)?;
|
||||
|
||||
Ok(Output::RecordBatches(
|
||||
RecordBatches::try_new(schema, vec![recordbatch]).context(NewRecordBatchesSnafu)?,
|
||||
))
|
||||
}
|
||||
|
||||
pub(crate) async fn show_tables(&self, stmt: ShowTables) -> Result<Output> {
|
||||
// TODO(dennis): supports WHERE
|
||||
ensure!(
|
||||
matches!(stmt.kind, ShowKind::All | ShowKind::Like(_)),
|
||||
UnsupportedExprSnafu {
|
||||
name: stmt.kind.to_string(),
|
||||
}
|
||||
);
|
||||
|
||||
let schema = if let Some(name) = &stmt.database {
|
||||
let catalog = self.get_default_catalog()?;
|
||||
catalog
|
||||
.schema(name)
|
||||
.context(CatalogSnafu)?
|
||||
.context(SchemaNotFoundSnafu { name })?
|
||||
} else {
|
||||
self.get_default_schema()?
|
||||
};
|
||||
let tables = schema.table_names().context(CatalogSnafu)?;
|
||||
|
||||
let column_schemas = vec![ColumnSchema::new(
|
||||
TABLES_COLUMN,
|
||||
ConcreteDataType::string_datatype(),
|
||||
false,
|
||||
)];
|
||||
let schema = Arc::new(Schema::new(column_schemas));
|
||||
|
||||
let tables_vector = if let ShowKind::Like(ident) = stmt.kind {
|
||||
Self::like_utf8(tables, &ident.value)?
|
||||
} else {
|
||||
Arc::new(StringVector::from(tables))
|
||||
};
|
||||
|
||||
let columns: Vec<VectorRef> = vec![tables_vector];
|
||||
let recordbatch = RecordBatch::new(schema.clone(), columns).context(NewRecordBatchSnafu)?;
|
||||
|
||||
Ok(Output::RecordBatches(
|
||||
RecordBatches::try_new(schema, vec![recordbatch]).context(NewRecordBatchesSnafu)?,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
fn assert_vector(expected: Vec<&str>, actual: &VectorRef) {
|
||||
let actual = actual.as_any().downcast_ref::<StringVector>().unwrap();
|
||||
|
||||
assert_eq!(*actual, StringVector::from(expected));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_like_utf8() {
|
||||
let names: Vec<String> = vec!["greptime", "hello", "public", "world"]
|
||||
.into_iter()
|
||||
.map(|x| x.to_string())
|
||||
.collect();
|
||||
|
||||
let ret = SqlHandler::like_utf8(names.clone(), "%ll%").unwrap();
|
||||
assert_vector(vec!["hello"], &ret);
|
||||
|
||||
let ret = SqlHandler::like_utf8(names.clone(), "%time").unwrap();
|
||||
assert_vector(vec!["greptime"], &ret);
|
||||
|
||||
let ret = SqlHandler::like_utf8(names.clone(), "%ld").unwrap();
|
||||
assert_vector(vec!["world"], &ret);
|
||||
|
||||
let ret = SqlHandler::like_utf8(names, "%").unwrap();
|
||||
assert_vector(vec!["greptime", "hello", "public", "world"], &ret);
|
||||
}
|
||||
}
|
||||
@@ -4,10 +4,12 @@ use std::any::Any;
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow::array::Array;
|
||||
use arrow::compute;
|
||||
use arrow::datatypes::DataType as ArrowDataType;
|
||||
use datafusion_common::ScalarValue;
|
||||
use snafu::OptionExt;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
|
||||
use crate::arrow_array::StringArray;
|
||||
use crate::error::{ConversionSnafu, Result, UnknownVectorSnafu};
|
||||
use crate::scalars::*;
|
||||
use crate::vectors::date::DateVector;
|
||||
@@ -193,6 +195,16 @@ impl Helper {
|
||||
pub fn try_into_vectors(arrays: &[ArrayRef]) -> Result<Vec<VectorRef>> {
|
||||
arrays.iter().map(Self::try_into_vector).collect()
|
||||
}
|
||||
|
||||
pub fn like_utf8(names: Vec<String>, s: &str) -> Result<VectorRef> {
|
||||
let array = StringArray::from_slice(&names);
|
||||
|
||||
let filter =
|
||||
compute::like::like_utf8_scalar(&array, s).context(error::ArrowComputeSnafu)?;
|
||||
|
||||
let result = compute::filter::filter(&array, &filter).context(error::ArrowComputeSnafu)?;
|
||||
Helper::try_into_vector(result)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -250,4 +262,29 @@ mod tests {
|
||||
assert_eq!(Value::DateTime(DateTime::new(42)), vector.get(i));
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_like_utf8() {
|
||||
fn assert_vector(expected: Vec<&str>, actual: &VectorRef) {
|
||||
let actual = actual.as_any().downcast_ref::<StringVector>().unwrap();
|
||||
assert_eq!(*actual, StringVector::from(expected));
|
||||
}
|
||||
|
||||
let names: Vec<String> = vec!["greptime", "hello", "public", "world"]
|
||||
.into_iter()
|
||||
.map(|x| x.to_string())
|
||||
.collect();
|
||||
|
||||
let ret = Helper::like_utf8(names.clone(), "%ll%").unwrap();
|
||||
assert_vector(vec!["hello"], &ret);
|
||||
|
||||
let ret = Helper::like_utf8(names.clone(), "%time").unwrap();
|
||||
assert_vector(vec!["greptime"], &ret);
|
||||
|
||||
let ret = Helper::like_utf8(names.clone(), "%ld").unwrap();
|
||||
assert_vector(vec!["world"], &ret);
|
||||
|
||||
let ret = Helper::like_utf8(names, "%").unwrap();
|
||||
assert_vector(vec!["greptime", "hello", "public", "world"], &ret);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -72,6 +72,18 @@ impl CatalogManager for FrontendCatalogManager {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn schema(
|
||||
&self,
|
||||
catalog: &str,
|
||||
schema: &str,
|
||||
) -> catalog::error::Result<Option<SchemaProviderRef>> {
|
||||
self.catalog(catalog)?
|
||||
.context(catalog::error::CatalogNotFoundSnafu {
|
||||
catalog_name: catalog,
|
||||
})?
|
||||
.schema(schema)
|
||||
}
|
||||
|
||||
fn table(
|
||||
&self,
|
||||
_catalog: &str,
|
||||
|
||||
@@ -359,6 +359,21 @@ pub enum Error {
|
||||
#[snafu(backtrace)]
|
||||
source: query::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Unsupported expr type: {}", name))]
|
||||
UnsupportedExpr { name: String, backtrace: Backtrace },
|
||||
|
||||
#[snafu(display("Failed to create a RecordBatch, source: {}", source))]
|
||||
NewRecordBatch {
|
||||
#[snafu(backtrace)]
|
||||
source: common_recordbatch::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to do vector computation, source: {}", source))]
|
||||
VectorComputation {
|
||||
#[snafu(backtrace)]
|
||||
source: datatypes::error::Error,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -387,7 +402,8 @@ impl ErrorExt for Error {
|
||||
Error::Table { source } => source.status_code(),
|
||||
|
||||
Error::ConvertColumnDefaultConstraint { source, .. }
|
||||
| Error::ConvertScalarValue { source, .. } => source.status_code(),
|
||||
| Error::ConvertScalarValue { source, .. }
|
||||
| Error::VectorComputation { source } => source.status_code(),
|
||||
|
||||
Error::ConnectDatanode { source, .. }
|
||||
| Error::RequestDatanode { source }
|
||||
@@ -402,7 +418,8 @@ impl ErrorExt for Error {
|
||||
| Error::FindRegionRoutes { .. }
|
||||
| Error::FindLeaderPeer { .. }
|
||||
| Error::FindRegionPartition { .. }
|
||||
| Error::IllegalTableRoutesData { .. } => StatusCode::Internal,
|
||||
| Error::IllegalTableRoutesData { .. }
|
||||
| Error::UnsupportedExpr { .. } => StatusCode::Internal,
|
||||
|
||||
Error::IllegalFrontendState { .. } | Error::IncompleteGrpcResult { .. } => {
|
||||
StatusCode::Unexpected
|
||||
@@ -434,6 +451,7 @@ impl ErrorExt for Error {
|
||||
Error::PrimaryKeyNotFound { .. } => StatusCode::InvalidArguments,
|
||||
Error::ExecuteSql { source, .. } => source.status_code(),
|
||||
Error::InsertBatchToRequest { source, .. } => source.status_code(),
|
||||
Error::NewRecordBatch { source } => source.status_code(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -188,11 +188,13 @@ impl Instance {
|
||||
self.script_handler = Some(handler);
|
||||
}
|
||||
|
||||
pub async fn handle_select(&self, expr: Select) -> Result<Output> {
|
||||
pub async fn handle_select(&self, expr: Select, stmt: Statement) -> Result<Output> {
|
||||
if let Some(dist_instance) = &self.dist_instance {
|
||||
dist_instance.handle_select(expr).await
|
||||
let Select::Sql(sql) = expr;
|
||||
dist_instance.handle_sql(&sql, stmt).await
|
||||
} else {
|
||||
// TODO(LFC): Find a better way to execute query between Frontend and Datanode in standalone mode.
|
||||
// TODO(LFC): Refactor consideration: Datanode should directly execute statement in standalone mode to avoid parse SQL again.
|
||||
// Find a better way to execute query between Frontend and Datanode in standalone mode.
|
||||
// Otherwise we have to parse SQL first to get schema name. Maybe not GRPC.
|
||||
self.database(DEFAULT_SCHEMA_NAME)
|
||||
.select(expr)
|
||||
@@ -520,7 +522,7 @@ impl SqlQueryHandler for Instance {
|
||||
|
||||
match stmt {
|
||||
Statement::Query(_) => self
|
||||
.handle_select(Select::Sql(query.to_string()))
|
||||
.handle_select(Select::Sql(query.to_string()), stmt)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(server_error::ExecuteQuerySnafu { query }),
|
||||
@@ -572,7 +574,7 @@ impl SqlQueryHandler for Instance {
|
||||
}
|
||||
|
||||
Statement::ShowDatabases(_) | Statement::ShowTables(_) => self
|
||||
.handle_select(Select::Sql(query.to_string()))
|
||||
.handle_select(Select::Sql(query.to_string()), stmt)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(server_error::ExecuteQuerySnafu { query }),
|
||||
|
||||
@@ -5,7 +5,6 @@ use api::helper::ColumnDataTypeWrapper;
|
||||
use api::v1::CreateExpr;
|
||||
use chrono::DateTime;
|
||||
use client::admin::{admin_result_to_output, Admin};
|
||||
use client::Select;
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_catalog::{TableGlobalKey, TableGlobalValue};
|
||||
use common_query::Output;
|
||||
@@ -17,10 +16,12 @@ use meta_client::rpc::{
|
||||
CreateRequest as MetaCreateRequest, Partition as MetaPartition, RouteResponse, TableName,
|
||||
TableRoute,
|
||||
};
|
||||
use query::sql::{show_databases, show_tables};
|
||||
use query::{QueryEngineFactory, QueryEngineRef};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use sql::statements::create::Partitions;
|
||||
use sql::statements::sql_value_to_value;
|
||||
use sql::statements::statement::Statement;
|
||||
use sqlparser::ast::Value as SqlValue;
|
||||
use table::metadata::RawTableMeta;
|
||||
|
||||
@@ -103,16 +104,24 @@ impl DistInstance {
|
||||
Ok(Output::AffectedRows(region_routes.len()))
|
||||
}
|
||||
|
||||
pub(crate) async fn handle_select(&self, select: Select) -> Result<Output> {
|
||||
let Select::Sql(sql) = select;
|
||||
let plan = self
|
||||
.query_engine
|
||||
.sql_to_plan(&sql)
|
||||
.with_context(|_| error::ExecuteSqlSnafu { sql: sql.clone() })?;
|
||||
self.query_engine
|
||||
.execute(&plan)
|
||||
.await
|
||||
.context(error::ExecuteSqlSnafu { sql })
|
||||
pub(crate) async fn handle_sql(&self, sql: &str, stmt: Statement) -> Result<Output> {
|
||||
match stmt {
|
||||
Statement::Query(_) => {
|
||||
let plan = self
|
||||
.query_engine
|
||||
.statement_to_plan(stmt)
|
||||
.context(error::ExecuteSqlSnafu { sql })?;
|
||||
self.query_engine
|
||||
.execute(&plan)
|
||||
.await
|
||||
.context(error::ExecuteSqlSnafu { sql })
|
||||
}
|
||||
Statement::ShowDatabases(stmt) => show_databases(stmt, self.catalog_manager.clone())
|
||||
.context(error::ExecuteSqlSnafu { sql }),
|
||||
Statement::ShowTables(stmt) => show_tables(stmt, self.catalog_manager.clone())
|
||||
.context(error::ExecuteSqlSnafu { sql }),
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn create_table_in_meta(
|
||||
|
||||
@@ -655,6 +655,8 @@ mod test {
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
// FIXME(LFC): Remove ignore.
|
||||
#[ignore]
|
||||
async fn test_dist_table_scan() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let table = Arc::new(new_dist_table().await);
|
||||
|
||||
@@ -1,7 +1,77 @@
|
||||
use std::any::Any;
|
||||
|
||||
use common_error::prelude::*;
|
||||
use datafusion::error::DataFusionError;
|
||||
use snafu::{Backtrace, ErrorCompat, Snafu};
|
||||
|
||||
common_error::define_opaque_error!(Error);
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
#[snafu(visibility(pub))]
|
||||
pub enum InnerError {
|
||||
#[snafu(display("Unsupported expr type: {}", name))]
|
||||
UnsupportedExpr { name: String, backtrace: Backtrace },
|
||||
|
||||
#[snafu(display("General catalog error: {}", source))]
|
||||
Catalog {
|
||||
#[snafu(backtrace)]
|
||||
source: catalog::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Catalog not found: {}", catalog))]
|
||||
CatalogNotFound {
|
||||
catalog: String,
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
|
||||
#[snafu(display("Schema not found: {}", schema))]
|
||||
SchemaNotFound {
|
||||
schema: String,
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to do vector computation, source: {}", source))]
|
||||
VectorComputation {
|
||||
#[snafu(backtrace)]
|
||||
source: datatypes::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to create RecordBatch, source: {}", source))]
|
||||
CreateRecordBatch {
|
||||
#[snafu(backtrace)]
|
||||
source: common_recordbatch::error::Error,
|
||||
},
|
||||
}
|
||||
|
||||
impl ErrorExt for InnerError {
|
||||
fn status_code(&self) -> StatusCode {
|
||||
use InnerError::*;
|
||||
|
||||
match self {
|
||||
UnsupportedExpr { .. } | CatalogNotFound { .. } | SchemaNotFound { .. } => {
|
||||
StatusCode::InvalidArguments
|
||||
}
|
||||
Catalog { source } => source.status_code(),
|
||||
VectorComputation { source } => source.status_code(),
|
||||
CreateRecordBatch { source } => source.status_code(),
|
||||
}
|
||||
}
|
||||
|
||||
fn backtrace_opt(&self) -> Option<&Backtrace> {
|
||||
ErrorCompat::backtrace(self)
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl From<InnerError> for Error {
|
||||
fn from(e: InnerError) -> Error {
|
||||
Error::new(e)
|
||||
}
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
impl From<Error> for DataFusionError {
|
||||
|
||||
@@ -11,5 +11,6 @@ pub mod physical_planner;
|
||||
pub mod plan;
|
||||
pub mod planner;
|
||||
pub mod query_engine;
|
||||
pub mod sql;
|
||||
|
||||
pub use crate::query_engine::{QueryContext, QueryEngine, QueryEngineFactory, QueryEngineRef};
|
||||
|
||||
81
src/query/src/sql.rs
Normal file
81
src/query/src/sql.rs
Normal file
@@ -0,0 +1,81 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use catalog::CatalogManagerRef;
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_query::Output;
|
||||
use common_recordbatch::RecordBatches;
|
||||
use datatypes::prelude::*;
|
||||
use datatypes::schema::{ColumnSchema, Schema};
|
||||
use datatypes::vectors::{Helper, StringVector};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use sql::statements::show::{ShowDatabases, ShowKind, ShowTables};
|
||||
|
||||
use crate::error::{self, Result};
|
||||
|
||||
const SCHEMAS_COLUMN: &str = "Schemas";
|
||||
const TABLES_COLUMN: &str = "Tables";
|
||||
|
||||
pub fn show_databases(stmt: ShowDatabases, catalog_manager: CatalogManagerRef) -> Result<Output> {
|
||||
// TODO(LFC): supports WHERE
|
||||
ensure!(
|
||||
matches!(stmt.kind, ShowKind::All | ShowKind::Like(_)),
|
||||
error::UnsupportedExprSnafu {
|
||||
name: stmt.kind.to_string(),
|
||||
}
|
||||
);
|
||||
|
||||
let catalog = catalog_manager
|
||||
.catalog(DEFAULT_CATALOG_NAME)
|
||||
.context(error::CatalogSnafu)?
|
||||
.context(error::CatalogNotFoundSnafu {
|
||||
catalog: DEFAULT_CATALOG_NAME,
|
||||
})?;
|
||||
let databases = catalog.schema_names().context(error::CatalogSnafu)?;
|
||||
|
||||
let databases = if let ShowKind::Like(ident) = stmt.kind {
|
||||
Helper::like_utf8(databases, &ident.value).context(error::VectorComputationSnafu)?
|
||||
} else {
|
||||
Arc::new(StringVector::from(databases))
|
||||
};
|
||||
|
||||
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
|
||||
SCHEMAS_COLUMN,
|
||||
ConcreteDataType::string_datatype(),
|
||||
false,
|
||||
)]));
|
||||
let records = RecordBatches::try_from_columns(schema, vec![databases])
|
||||
.context(error::CreateRecordBatchSnafu)?;
|
||||
Ok(Output::RecordBatches(records))
|
||||
}
|
||||
|
||||
pub fn show_tables(stmt: ShowTables, catalog_manager: CatalogManagerRef) -> Result<Output> {
|
||||
// TODO(LFC): supports WHERE
|
||||
ensure!(
|
||||
matches!(stmt.kind, ShowKind::All | ShowKind::Like(_)),
|
||||
error::UnsupportedExprSnafu {
|
||||
name: stmt.kind.to_string(),
|
||||
}
|
||||
);
|
||||
|
||||
let schema = stmt.database.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME);
|
||||
let schema = catalog_manager
|
||||
.schema(DEFAULT_CATALOG_NAME, schema)
|
||||
.context(error::CatalogSnafu)?
|
||||
.context(error::SchemaNotFoundSnafu { schema })?;
|
||||
let tables = schema.table_names().context(error::CatalogSnafu)?;
|
||||
|
||||
let tables = if let ShowKind::Like(ident) = stmt.kind {
|
||||
Helper::like_utf8(tables, &ident.value).context(error::VectorComputationSnafu)?
|
||||
} else {
|
||||
Arc::new(StringVector::from(tables))
|
||||
};
|
||||
|
||||
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
|
||||
TABLES_COLUMN,
|
||||
ConcreteDataType::string_datatype(),
|
||||
false,
|
||||
)]));
|
||||
let records = RecordBatches::try_from_columns(schema, vec![tables])
|
||||
.context(error::CreateRecordBatchSnafu)?;
|
||||
Ok(Output::RecordBatches(records))
|
||||
}
|
||||
Reference in New Issue
Block a user