feat: support where in show (#1829)

* feat: support where in show

* fix: lift schema out of match

* fix: rename

* fix: improve error handling
This commit is contained in:
Niwaka
2023-07-07 14:45:54 +09:00
committed by GitHub
parent 03e30652c8
commit 8dcb12e317
5 changed files with 200 additions and 27 deletions

View File

@@ -31,24 +31,31 @@ use common_query::physical_plan::{DfPhysicalPlanAdapter, PhysicalPlan, PhysicalP
use common_query::prelude::ScalarUdf;
use common_query::Output;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream};
use common_recordbatch::{
EmptyRecordBatchStream, RecordBatch, RecordBatches, SendableRecordBatchStream,
};
use common_telemetry::timer;
use datafusion::common::Column;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_common::ResolvedTableReference;
use datafusion_expr::{DmlStatement, LogicalPlan as DfLogicalPlan, WriteOp};
use datafusion::prelude::SessionContext;
use datafusion_common::{ResolvedTableReference, ScalarValue};
use datafusion_expr::{DmlStatement, Expr as DfExpr, LogicalPlan as DfLogicalPlan, WriteOp};
use datatypes::prelude::VectorRef;
use datatypes::schema::Schema;
use futures_util::StreamExt;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use sql::ast::{BinaryOperator, Expr, Value};
use table::requests::{DeleteRequest, InsertRequest};
use table::TableRef;
use crate::dataframe::DataFrame;
pub use crate::datafusion::planner::DfContextProviderAdapter;
use crate::error::{
CatalogSnafu, CreateRecordBatchSnafu, DataFusionSnafu, MissingTimestampColumnSnafu,
QueryExecutionSnafu, Result, TableNotFoundSnafu, UnsupportedExprSnafu,
CatalogSnafu, CreateRecordBatchSnafu, CreateSchemaSnafu, DataFusionSnafu,
MissingTimestampColumnSnafu, QueryExecutionSnafu, Result, TableNotFoundSnafu,
UnimplementedSnafu, UnsupportedExprSnafu,
};
use crate::executor::QueryExecutor;
use crate::logical_optimizer::LogicalOptimizer;
@@ -386,6 +393,86 @@ impl QueryExecutor for DatafusionQueryEngine {
}
}
fn convert_filter_to_df_filter(filter: Expr) -> Result<DfExpr> {
match filter {
Expr::BinaryOp { left, op, right } => {
let left = convert_filter_to_df_filter(*left)?;
let right = convert_filter_to_df_filter(*right)?;
match op {
BinaryOperator::Eq => Ok(left.eq(right)),
_ => UnimplementedSnafu {
operation: format!("convert BinaryOperator into datafusion Expr, op: {op}"),
}
.fail(),
}
}
Expr::Value(value) => match value {
Value::SingleQuotedString(v) => Ok(DfExpr::Literal(ScalarValue::Utf8(Some(v)))),
_ => UnimplementedSnafu {
operation: format!("convert Expr::Value into datafusion Expr, value: {value}"),
}
.fail(),
},
Expr::Identifier(ident) => Ok(DfExpr::Column(Column::from_name(ident.value))),
_ => UnimplementedSnafu {
operation: format!("convert Expr into datafusion Expr, Expr: {filter}"),
}
.fail(),
}
}
/// Creates a table in memory and executes a show statement on the table.
pub async fn execute_show_with_filter(
record_batch: RecordBatch,
filter: Option<Expr>,
) -> Result<Output> {
let table_name = "table_name";
let column_schemas = record_batch.schema.column_schemas().to_vec();
let context = SessionContext::new();
context
.register_batch(table_name, record_batch.into_df_record_batch())
.context(error::DatafusionSnafu {
msg: "Fail to register a record batch as a table",
})
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
let mut dataframe = context
.sql(&format!("SELECT * FROM {table_name}"))
.await
.context(error::DatafusionSnafu {
msg: "Fail to execute a sql",
})
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
if let Some(filter) = filter {
let filter = convert_filter_to_df_filter(filter)?;
dataframe = dataframe
.filter(filter)
.context(error::DatafusionSnafu {
msg: "Fail to filter",
})
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?
}
let df_batches = dataframe
.collect()
.await
.context(error::DatafusionSnafu {
msg: "Fail to collect the record batches",
})
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
let mut batches = Vec::with_capacity(df_batches.len());
let schema = Arc::new(Schema::try_new(column_schemas).context(CreateSchemaSnafu)?);
for df_batch in df_batches.into_iter() {
let batch = RecordBatch::try_from_df_record_batch(schema.clone(), df_batch)
.context(CreateRecordBatchSnafu)?;
batches.push(batch);
}
let record_batches = RecordBatches::try_new(schema, batches).context(CreateRecordBatchSnafu)?;
Ok(Output::RecordBatches(record_batches))
}
#[cfg(test)]
mod tests {
use std::borrow::Cow::Borrowed;
@@ -394,12 +481,17 @@ mod tests {
use catalog::{CatalogManager, RegisterTableRequest};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, NUMBERS_TABLE_ID};
use common_query::Output;
use common_recordbatch::util;
use common_recordbatch::{util, RecordBatch};
use datafusion::prelude::{col, lit};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::vectors::{Helper, UInt32Vector, UInt64Vector, VectorRef};
use datatypes::prelude::{ConcreteDataType, MutableVector, ScalarVectorBuilder};
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::types::StringType;
use datatypes::vectors::{Helper, StringVectorBuilder, UInt32Vector, UInt64Vector, VectorRef};
use session::context::QueryContext;
use sql::dialect::GreptimeDbDialect;
use sql::parser::ParserContext;
use sql::statements::show::{ShowKind, ShowTables};
use sql::statements::statement::Statement;
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
use super::*;
@@ -544,4 +636,62 @@ mod tests {
);
assert_eq!("Limit: skip=0, fetch=20\n Aggregate: groupBy=[[]], aggr=[[SUM(numbers.number)]]\n TableScan: numbers projection=[number]", format!("{}", logical_plan.display_indent()));
}
#[tokio::test]
async fn test_show_tables() {
// No filter
let column_schemas = vec![ColumnSchema::new(
"Tables",
ConcreteDataType::String(StringType),
false,
)];
let schema = Arc::new(Schema::new(column_schemas));
let mut builder = StringVectorBuilder::with_capacity(3);
builder.push(Some("monitor"));
builder.push(Some("system_metrics"));
let columns = vec![builder.to_vector()];
let record_batch = RecordBatch::new(schema, columns).unwrap();
let output = execute_show_with_filter(record_batch, None).await.unwrap();
let Output::RecordBatches(record_batches) = output else {unreachable!()};
let expected = "\
+----------------+
| Tables |
+----------------+
| monitor |
| system_metrics |
+----------------+";
assert_eq!(record_batches.pretty_print().unwrap(), expected);
// Filter
let column_schemas = vec![ColumnSchema::new(
"Tables",
ConcreteDataType::String(StringType),
false,
)];
let schema = Arc::new(Schema::new(column_schemas));
let mut builder = StringVectorBuilder::with_capacity(3);
builder.push(Some("monitor"));
builder.push(Some("system_metrics"));
let columns = vec![builder.to_vector()];
let record_batch = RecordBatch::new(schema, columns).unwrap();
let statement = ParserContext::create_with_dialect(
"SHOW TABLES WHERE \"Tables\"='monitor'",
&GreptimeDbDialect {},
)
.unwrap()[0]
.clone();
let Statement::ShowTables(ShowTables { kind, .. }) = statement else {unreachable!()};
let ShowKind::Where(filter) = kind else {unreachable!()};
let output = execute_show_with_filter(record_batch, Some(filter))
.await
.unwrap();
let Output::RecordBatches(record_batches) = output else {unreachable!()};
let expected = "\
+---------+
| Tables |
+---------+
| monitor |
+---------+";
assert_eq!(record_batches.pretty_print().unwrap(), expected);
}
}

View File

@@ -60,6 +60,12 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to create Schema, source: {}", source))]
CreateSchema {
source: datatypes::error::Error,
location: Location,
},
#[snafu(display("Failure during query execution, source: {}", source))]
QueryExecution {
source: BoxedError,
@@ -258,6 +264,7 @@ impl ErrorExt for Error {
ConvertSqlType { source, .. } | ConvertSqlValue { source, .. } => source.status_code(),
RemoteRequest { source, .. } => source.status_code(),
UnexpectedOutputKind { .. } => StatusCode::Unexpected,
CreateSchema { source, .. } => source.status_code(),
}
}

View File

@@ -26,7 +26,7 @@ use common_datasource::lister::{Lister, Source};
use common_datasource::object_store::build_backend;
use common_datasource::util::find_dir_and_filename;
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_recordbatch::{RecordBatch, RecordBatches};
use datatypes::prelude::*;
use datatypes::schema::{ColumnSchema, RawSchema, Schema};
use datatypes::vectors::{Helper, StringVector};
@@ -42,6 +42,7 @@ use sql::statements::show::{ShowDatabases, ShowKind, ShowTables};
use table::requests::{IMMUTABLE_TABLE_LOCATION_KEY, IMMUTABLE_TABLE_PATTERN_KEY};
use table::TableRef;
use crate::datafusion::execute_show_with_filter;
use crate::error::{self, Result};
const SCHEMAS_COLUMN: &str = "Schemas";
@@ -133,13 +134,6 @@ pub async fn show_tables(
catalog_manager: CatalogManagerRef,
query_ctx: QueryContextRef,
) -> Result<Output> {
ensure!(
matches!(stmt.kind, ShowKind::All | ShowKind::Like(_)),
error::UnsupportedExprSnafu {
name: stmt.kind.to_string(),
}
);
let schema = if let Some(database) = stmt.database {
database
} else {
@@ -153,21 +147,33 @@ pub async fn show_tables(
// TODO(dennis): Specify the order of the results in schema provider API
tables.sort();
let tables = if let ShowKind::Like(ident) = stmt.kind {
Helper::like_utf8(tables, &ident.value).context(error::VectorComputationSnafu)?
} else {
Arc::new(StringVector::from(tables))
};
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
TABLES_COLUMN,
ConcreteDataType::string_datatype(),
false,
)]));
let records = RecordBatches::try_from_columns(schema, vec![tables])
.context(error::CreateRecordBatchSnafu)?;
Ok(Output::RecordBatches(records))
match stmt.kind {
ShowKind::All => {
let tables = Arc::new(StringVector::from(tables)) as _;
let records = RecordBatches::try_from_columns(schema, vec![tables])
.context(error::CreateRecordBatchSnafu)?;
Ok(Output::RecordBatches(records))
}
ShowKind::Where(filter) => {
let columns = vec![Arc::new(StringVector::from(tables)) as _];
let record_batch =
RecordBatch::new(schema, columns).context(error::CreateRecordBatchSnafu)?;
let result = execute_show_with_filter(record_batch, Some(filter)).await?;
Ok(result)
}
ShowKind::Like(ident) => {
let tables =
Helper::like_utf8(tables, &ident.value).context(error::VectorComputationSnafu)?;
let records = RecordBatches::try_from_columns(schema, vec![tables])
.context(error::CreateRecordBatchSnafu)?;
Ok(Output::RecordBatches(records))
}
}
}
pub fn show_create_table(table: TableRef, partitions: Option<Partitions>) -> Result<Output> {

View File

@@ -99,6 +99,14 @@ SHOW TABLES FROM public;
| scripts |
+---------+
SHOW TABLES FROM public WHERE Tables='numbers';
+---------+
| Tables |
+---------+
| numbers |
+---------+
DROP SCHEMA test_public_schema;
Error: 1001(Unsupported), SQL statement is not supported: DROP SCHEMA test_public_schema;, keyword: SCHEMA

View File

@@ -32,6 +32,8 @@ SHOW TABLES FROM test_public_schema;
SHOW TABLES FROM public;
SHOW TABLES FROM public WHERE Tables='numbers';
DROP SCHEMA test_public_schema;
SELECT * FROM test_public_schema.hello;