diff --git a/src/catalog/src/information_schema.rs b/src/catalog/src/information_schema.rs index d644caa48a..2304b5197c 100644 --- a/src/catalog/src/information_schema.rs +++ b/src/catalog/src/information_schema.rs @@ -26,6 +26,7 @@ use futures_util::StreamExt; use snafu::ResultExt; use store_api::storage::ScanRequest; use table::error::{SchemaConversionSnafu, TablesRecordBatchSnafu}; +use table::metadata::TableType; use table::{Result as TableResult, Table, TableRef}; use self::columns::InformationSchemaColumns; @@ -102,6 +103,10 @@ impl Table for InformationTable { unreachable!("Should not call table_info() of InformationTable directly") } + fn table_type(&self) -> table::metadata::TableType { + TableType::View + } + async fn scan_to_stream(&self, request: ScanRequest) -> TableResult { let projection = request.projection; let projected_schema = if let Some(projection) = &projection { diff --git a/src/common/substrait/src/df_substrait.rs b/src/common/substrait/src/df_substrait.rs index 45d71a6ec3..34db1cd39c 100644 --- a/src/common/substrait/src/df_substrait.rs +++ b/src/common/substrait/src/df_substrait.rs @@ -16,7 +16,6 @@ use std::sync::Arc; use async_trait::async_trait; use bytes::{Buf, Bytes, BytesMut}; -use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use datafusion::catalog::catalog::CatalogList; use datafusion::execution::context::SessionState; use datafusion::execution::runtime_env::RuntimeEnv; @@ -43,9 +42,10 @@ impl SubstraitPlan for DFLogicalSubstraitConvertor { &self, message: B, catalog_list: Arc, + catalog: &str, + schema: &str, ) -> Result { - let state_config = SessionConfig::new() - .with_default_catalog_and_schema(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME); + let state_config = SessionConfig::new().with_default_catalog_and_schema(catalog, schema); let state = SessionState::with_config_rt(state_config, Arc::new(RuntimeEnv::default())); let mut context = SessionContext::with_state(state); context.register_catalog_list(catalog_list); diff --git a/src/common/substrait/src/lib.rs b/src/common/substrait/src/lib.rs index 07b2fd5c6b..ebefcb662e 100644 --- a/src/common/substrait/src/lib.rs +++ b/src/common/substrait/src/lib.rs @@ -36,6 +36,8 @@ pub trait SubstraitPlan { &self, message: B, catalog_list: Arc, + catalog: &str, + schema: &str, ) -> Result; fn encode(&self, plan: Self::Plan) -> Result; diff --git a/src/datanode/src/instance/grpc.rs b/src/datanode/src/instance/grpc.rs index 327adc2d5e..c94754b93b 100644 --- a/src/datanode/src/instance/grpc.rs +++ b/src/datanode/src/instance/grpc.rs @@ -74,7 +74,12 @@ impl Instance { .await?; let logical_plan = DFLogicalSubstraitConvertor - .decode(plan_bytes.as_slice(), Arc::new(catalog_list) as Arc<_>) + .decode( + plan_bytes.as_slice(), + Arc::new(catalog_list) as Arc<_>, + &ctx.current_catalog(), + &ctx.current_schema(), + ) .await .context(DecodeLogicalPlanSnafu)?; diff --git a/src/query/src/dist_plan/planner.rs b/src/query/src/dist_plan/planner.rs index 9b4ac6376a..3003d6f436 100644 --- a/src/query/src/dist_plan/planner.rs +++ b/src/query/src/dist_plan/planner.rs @@ -23,15 +23,18 @@ use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_meta::peer::Peer; use common_meta::table_name::TableName; use datafusion::common::Result; +use datafusion::datasource::DefaultTableSource; use datafusion::execution::context::SessionState; use datafusion::physical_plan::planner::ExtensionPlanner; use datafusion::physical_plan::{ExecutionPlan, PhysicalPlanner}; -use datafusion_common::tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion}; +use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeVisitor, VisitRecursion}; use datafusion_common::{DataFusionError, TableReference}; use datafusion_expr::{LogicalPlan, UserDefinedLogicalNode}; use partition::manager::PartitionRuleManager; use snafu::ResultExt; use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; +pub use table::metadata::TableType; +use table::table::adapter::DfTableProviderAdapter; use crate::dist_plan::merge_scan::{MergeScanExec, MergeScanLogicalPlan}; use crate::error; @@ -82,6 +85,7 @@ impl ExtensionPlanner for DistExtensionPlanner { .map(Some); }; let input_schema = input_plan.schema().clone(); + let input_plan = self.set_table_name(&table_name, input_plan.clone())?; let substrait_plan: Bytes = DFLogicalSubstraitConvertor .encode(input_plan.clone()) .context(error::EncodeSubstraitLogicalPlanSnafu)? @@ -100,7 +104,7 @@ impl ExtensionPlanner for DistExtensionPlanner { Ok(Some(Arc::new(exec) as _)) } Err(_) => planner - .create_physical_plan(input_plan, session_state) + .create_physical_plan(&input_plan, session_state) .await .map(Some), } @@ -119,6 +123,12 @@ impl DistExtensionPlanner { Ok(extractor.table_name) } + /// Set the fully resolved table name to TableScan plan + fn set_table_name(&self, name: &TableName, plan: LogicalPlan) -> Result { + // let mut rewriter + plan.transform(&|plan| TableNameRewriter::rewrite_table_name(plan, name)) + } + async fn get_peers(&self, table_name: &TableName) -> Result> { self.partition_manager .find_table_region_leaders(table_name) @@ -142,6 +152,23 @@ impl TreeNodeVisitor for TableNameExtractor { fn pre_visit(&mut self, node: &Self::N) -> Result { match node { LogicalPlan::TableScan(scan) => { + if let Some(source) = scan.source.as_any().downcast_ref::() { + if let Some(provider) = source + .table_provider + .as_any() + .downcast_ref::() + { + if provider.table().table_type() == TableType::Base { + let info = provider.table().table_info(); + self.table_name = Some(TableName::new( + info.catalog_name.clone(), + info.schema_name.clone(), + info.name.clone(), + )); + return Ok(VisitRecursion::Stop); + } + } + } match &scan.table_name { TableReference::Full { catalog, @@ -178,3 +205,24 @@ impl TreeNodeVisitor for TableNameExtractor { } } } + +struct TableNameRewriter; + +impl TableNameRewriter { + fn rewrite_table_name( + plan: LogicalPlan, + name: &TableName, + ) -> datafusion_common::Result> { + Ok(match plan { + LogicalPlan::TableScan(mut table_scan) => { + table_scan.table_name = TableReference::full( + name.catalog_name.clone(), + name.schema_name.clone(), + name.table_name.clone(), + ); + Transformed::Yes(LogicalPlan::TableScan(table_scan)) + } + _ => Transformed::No(plan), + }) + } +} diff --git a/tests/cases/standalone/common/create/upper_case_table_name.result b/tests/cases/standalone/common/create/upper_case_table_name.result new file mode 100644 index 0000000000..e517358991 --- /dev/null +++ b/tests/cases/standalone/common/create/upper_case_table_name.result @@ -0,0 +1,39 @@ +create database upper_case_table_name; + +Affected Rows: 1 + +use upper_case_table_name; + +++ +++ + +create table system_Metric(ts timestamp time index); + +Affected Rows: 0 + +insert into system_Metric values (0), (1); + +Affected Rows: 2 + +select * from system_Metric; + +Error: 3000(PlanQuery), Error during planning: Table not found: greptime.upper_case_table_name.system_metric + +select * from "system_Metric"; + ++-------------------------+ +| ts | ++-------------------------+ +| 1970-01-01T00:00:00 | +| 1970-01-01T00:00:00.001 | ++-------------------------+ + +drop table system_Metric; + +Affected Rows: 1 + +use public; + +++ +++ + diff --git a/tests/cases/standalone/common/create/upper_case_table_name.sql b/tests/cases/standalone/common/create/upper_case_table_name.sql new file mode 100644 index 0000000000..32bb1fc3b3 --- /dev/null +++ b/tests/cases/standalone/common/create/upper_case_table_name.sql @@ -0,0 +1,15 @@ +create database upper_case_table_name; + +use upper_case_table_name; + +create table system_Metric(ts timestamp time index); + +insert into system_Metric values (0), (1); + +select * from system_Metric; + +select * from "system_Metric"; + +drop table system_Metric; + +use public;