From 9d87c8b6ded68fb2c42fb2a6ba1e57cb23131542 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Fri, 25 Aug 2023 14:37:39 +0800 Subject: [PATCH] refactor(table): cleanup dist table (#2255) Signed-off-by: Zhenchi --- src/catalog/src/information_schema.rs | 4 +- src/frontend/src/catalog.rs | 8 +- src/frontend/src/instance/distributed.rs | 6 +- src/frontend/src/table.rs | 233 +++-------------------- src/frontend/src/table/scan.rs | 104 ---------- src/table/src/table/numbers.rs | 5 +- src/table/src/thin_table.rs | 15 +- tests-integration/src/grpc.rs | 4 - 8 files changed, 31 insertions(+), 348 deletions(-) delete mode 100644 src/frontend/src/table/scan.rs diff --git a/src/catalog/src/information_schema.rs b/src/catalog/src/information_schema.rs index 8f45216c8e..ec118f942c 100644 --- a/src/catalog/src/information_schema.rs +++ b/src/catalog/src/information_schema.rs @@ -70,11 +70,9 @@ impl InformationSchemaProvider { pub fn table(&self, name: &str) -> Option { self.information_table(name).map(|table| { - let schema = table.schema(); let table_info = Self::table_info(self.catalog_name.clone(), &table); - let table_type = table.table_type(); let filter_pushdown = FilterPushDownType::Unsupported; - let thin_table = ThinTable::new(schema, table_info, table_type, filter_pushdown); + let thin_table = ThinTable::new(table_info, filter_pushdown); let data_source = Arc::new(InformationTableDataSource::new(table)); Arc::new(ThinTableAdapter::new(thin_table, data_source)) as _ diff --git a/src/frontend/src/catalog.rs b/src/frontend/src/catalog.rs index 8479051500..5019a2d576 100644 --- a/src/frontend/src/catalog.rs +++ b/src/frontend/src/catalog.rs @@ -38,7 +38,6 @@ use common_meta::key::table_info::TableInfoKey; use common_meta::key::table_name::TableNameKey; use common_meta::key::{TableMetaKey, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; -use common_meta::table_name::TableName; use common_telemetry::debug; use futures_util::TryStreamExt; use partition::manager::PartitionRuleManagerRef; @@ -417,12 +416,7 @@ impl CatalogManager for FrontendCatalogManager { .try_into() .context(catalog_err::InvalidTableInfoInCatalogSnafu)?, ); - let table = Arc::new(DistTable::new( - TableName::new(catalog, schema, table_name), - table_info, - Arc::new(self.clone()), - )); - Ok(Some(table)) + Ok(Some(DistTable::table(table_info))) } fn as_any(&self) -> &dyn Any { diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index 6fed3bc65e..7bb26d59c0 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -129,11 +129,7 @@ impl DistInstance { create_table.table_id = Some(api::v1::TableId { id: table_id }); - let table = Arc::new(DistTable::new( - table_name.clone(), - table_info, - self.catalog_manager.clone(), - )); + let table = DistTable::table(table_info); let request = RegisterTableRequest { catalog: table_name.catalog_name.clone(), diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index 46f4b78497..e5cac330aa 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -12,228 +12,41 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::any::Any; use std::sync::Arc; -use async_trait::async_trait; -use client::Database; use common_error::ext::BoxedError; -use common_meta::table_name::TableName; -use common_query::error::Result as QueryResult; -use common_query::logical_plan::Expr; -use common_query::physical_plan::{PhysicalPlan, PhysicalPlanRef}; -use common_query::Output; -use common_recordbatch::adapter::AsyncRecordBatchStreamAdapter; -use common_recordbatch::error::{ - ExternalSnafu as RecordBatchExternalSnafu, Result as RecordBatchResult, -}; -use common_recordbatch::{RecordBatchStreamAdaptor, SendableRecordBatchStream}; -use datafusion::execution::context::TaskContext; -use datafusion::physical_plan::Partitioning; -use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; -use futures_util::StreamExt; -use snafu::prelude::*; +use common_recordbatch::SendableRecordBatchStream; +use store_api::data_source::DataSource; use store_api::storage::ScanRequest; -use table::error::TableOperationSnafu; -use table::metadata::{FilterPushDownType, TableInfoRef, TableType}; -use table::Table; +use table::metadata::{FilterPushDownType, TableInfoRef}; +use table::thin_table::{ThinTable, ThinTableAdapter}; +use table::TableRef; -use crate::catalog::FrontendCatalogManager; -use crate::table::scan::{DatanodeInstance, TableScanPlan}; +use crate::error::NotSupportedSnafu; pub mod delete; pub mod insert; -pub(crate) mod scan; #[derive(Clone)] -pub struct DistTable { - table_name: TableName, - table_info: TableInfoRef, - catalog_manager: Arc, -} - -#[async_trait] -impl Table for DistTable { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> SchemaRef { - self.table_info.meta.schema.clone() - } - - fn table_info(&self) -> TableInfoRef { - self.table_info.clone() - } - - fn table_type(&self) -> TableType { - self.table_info.table_type - } - - // TODO(ruihang): DistTable should not call this method directly - async fn scan_to_stream( - &self, - request: ScanRequest, - ) -> table::Result { - let partition_manager = self.catalog_manager.partition_manager(); - let datanode_clients = self.catalog_manager.datanode_clients(); - - let table_id = self.table_info.table_id(); - let partition_rule = partition_manager - .find_table_partition_rule(table_id) - .await - .map_err(BoxedError::new) - .context(TableOperationSnafu)?; - - let regions = partition_manager - .find_regions_by_filters(partition_rule, &request.filters) - .map_err(BoxedError::new) - .context(TableOperationSnafu)?; - let datanodes = partition_manager - .find_region_datanodes(table_id, regions) - .await - .map_err(BoxedError::new) - .context(TableOperationSnafu)?; - - let table_name = &self.table_name; - let mut partition_execs = Vec::with_capacity(datanodes.len()); - for (datanode, _regions) in datanodes.iter() { - let client = datanode_clients.get_client(datanode).await; - let db = Database::new(&table_name.catalog_name, &table_name.schema_name, client); - let datanode_instance = DatanodeInstance::new(Arc::new(self.clone()) as _, db); - - partition_execs.push(Arc::new(PartitionExec { - table_name: table_name.clone(), - datanode_instance, - projection: request.projection.clone(), - filters: request.filters.clone(), - limit: request.limit, - })); - } - - let stream = Box::pin(async_stream::stream!({ - for partition_exec in partition_execs { - let mut stream = partition_exec.scan_to_stream().await?; - while let Some(record_batch) = stream.next().await { - yield record_batch; - } - } - })); - - let schema = project_schema(self.schema(), request.projection.as_ref()); - let stream = RecordBatchStreamAdaptor { - schema, - stream, - output_ordering: None, - }; - - Ok(Box::pin(stream)) - } - - fn supports_filters_pushdown( - &self, - filters: &[&Expr], - ) -> table::Result> { - Ok(vec![FilterPushDownType::Inexact; filters.len()]) - } -} +pub struct DistTable; impl DistTable { - pub fn new( - table_name: TableName, - table_info: TableInfoRef, - catalog_manager: Arc, - ) -> Self { - Self { - table_name, - table_info, - catalog_manager, + pub fn table(table_info: TableInfoRef) -> TableRef { + let thin_table = ThinTable::new(table_info, FilterPushDownType::Inexact); + let data_source = Arc::new(DummyDataSource); + Arc::new(ThinTableAdapter::new(thin_table, data_source)) + } +} + +pub struct DummyDataSource; + +impl DataSource for DummyDataSource { + fn get_stream(&self, _request: ScanRequest) -> Result { + NotSupportedSnafu { + feat: "get stream from a distributed table", } - } -} - -fn project_schema(table_schema: SchemaRef, projection: Option<&Vec>) -> SchemaRef { - if let Some(projection) = projection { - let columns = table_schema.column_schemas(); - let projected = projection - .iter() - .map(|x| columns[*x].clone()) - .collect::>(); - Arc::new(Schema::new(projected)) - } else { - table_schema - } -} - -#[derive(Debug)] -struct DistTableScan { - schema: SchemaRef, - partition_execs: Vec>, -} - -impl PhysicalPlan for DistTableScan { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> SchemaRef { - self.schema.clone() - } - - fn output_partitioning(&self) -> Partitioning { - Partitioning::UnknownPartitioning(self.partition_execs.len()) - } - - fn children(&self) -> Vec { - vec![] - } - - fn with_new_children(&self, _children: Vec) -> QueryResult { - unimplemented!() - } - - fn execute( - &self, - partition: usize, - _context: Arc, - ) -> QueryResult { - let exec = self.partition_execs[partition].clone(); - let stream = Box::pin(async move { exec.scan_to_stream().await }); - let stream = AsyncRecordBatchStreamAdapter::new(self.schema(), stream); - Ok(Box::pin(stream)) - } -} - -#[derive(Debug)] -struct PartitionExec { - table_name: TableName, - datanode_instance: DatanodeInstance, - projection: Option>, - filters: Vec, - limit: Option, -} - -impl PartitionExec { - async fn scan_to_stream(&self) -> RecordBatchResult { - let plan: TableScanPlan = TableScanPlan { - table_name: self.table_name.clone(), - projection: self.projection.clone(), - filters: self.filters.clone(), - limit: self.limit, - }; - - let output = self - .datanode_instance - .grpc_table_scan(plan) - .await - .map_err(BoxedError::new) - .context(RecordBatchExternalSnafu)?; - - let Output::Stream(stream) = output else { - unreachable!() - }; - - Ok(stream) + .fail() + .map_err(BoxedError::new) } } @@ -245,6 +58,8 @@ pub(crate) mod test { use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_meta::peer::Peer; use common_meta::rpc::router::{Region, RegionRoute, Table, TableRoute}; + use common_meta::table_name::TableName; + use common_query::prelude::Expr; use datafusion_expr::expr_fn::{and, binary_expr, col, or}; use datafusion_expr::{lit, Operator}; use meta_client::client::MetaClient; diff --git a/src/frontend/src/table/scan.rs b/src/frontend/src/table/scan.rs deleted file mode 100644 index d149ef2805..0000000000 --- a/src/frontend/src/table/scan.rs +++ /dev/null @@ -1,104 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::fmt::Formatter; -use std::sync::Arc; - -use client::Database; -use common_meta::table_name::TableName; -use common_query::prelude::Expr; -use common_query::Output; -use datafusion::datasource::DefaultTableSource; -use datafusion_expr::{LogicalPlan, LogicalPlanBuilder}; -use snafu::ResultExt; -use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; -use table::table::adapter::DfTableProviderAdapter; -use table::TableRef; - -use crate::error::{self, Result}; - -#[derive(Clone)] -pub struct DatanodeInstance { - table: TableRef, - db: Database, -} - -impl std::fmt::Debug for DatanodeInstance { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.write_str("DatanodeInstance") - } -} - -impl DatanodeInstance { - pub(crate) fn new(table: TableRef, db: Database) -> Self { - Self { table, db } - } - - pub(crate) async fn grpc_table_scan(&self, plan: TableScanPlan) -> Result { - let logical_plan = self.build_logical_plan(&plan)?; - - let substrait_plan = DFLogicalSubstraitConvertor - .encode(&logical_plan) - .context(error::EncodeSubstraitLogicalPlanSnafu)?; - - self.db - .logical_plan(substrait_plan.to_vec(), None) - .await - .context(error::RequestDatanodeSnafu) - } - - fn build_logical_plan(&self, table_scan: &TableScanPlan) -> Result { - let table_provider = Arc::new(DfTableProviderAdapter::new(self.table.clone())); - - let mut builder = LogicalPlanBuilder::scan_with_filters( - table_scan.table_name.to_string(), - Arc::new(DefaultTableSource::new(table_provider)), - table_scan.projection.clone(), - table_scan - .filters - .iter() - .map(|x| x.df_expr().clone()) - .collect::>(), - ) - .context(error::BuildDfLogicalPlanSnafu)?; - - if let Some(filter) = table_scan - .filters - .iter() - .map(|x| x.df_expr()) - .cloned() - .reduce(|accum, expr| accum.and(expr)) - { - builder = builder - .filter(filter) - .context(error::BuildDfLogicalPlanSnafu)?; - } - - if table_scan.limit.is_some() { - builder = builder - .limit(0, table_scan.limit) - .context(error::BuildDfLogicalPlanSnafu)?; - } - - builder.build().context(error::BuildDfLogicalPlanSnafu) - } -} - -#[derive(Debug)] -pub(crate) struct TableScanPlan { - pub table_name: TableName, - pub projection: Option>, - pub filters: Vec, - pub limit: Option, -} diff --git a/src/table/src/table/numbers.rs b/src/table/src/table/numbers.rs index 0c9cc62083..d761abc50a 100644 --- a/src/table/src/table/numbers.rs +++ b/src/table/src/table/numbers.rs @@ -48,14 +48,11 @@ impl NumbersTable { } pub fn table_with_name(table_id: TableId, name: String) -> TableRef { - let schema = Self::schema(); let thin_table = ThinTable::new( - schema.clone(), Self::table_info(table_id, name, "test_engine".to_string()), - TableType::Temporary, FilterPushDownType::Unsupported, ); - let data_source = Arc::new(NumbersDataSource::new(schema)); + let data_source = Arc::new(NumbersDataSource::new(Self::schema())); Arc::new(ThinTableAdapter::new(thin_table, data_source)) } diff --git a/src/table/src/thin_table.rs b/src/table/src/thin_table.rs index a22b06cf47..b10dd84acb 100644 --- a/src/table/src/thin_table.rs +++ b/src/table/src/thin_table.rs @@ -29,23 +29,14 @@ use crate::Table; /// The `ThinTable` struct will replace the `Table` trait. /// TODO(zhongzc): After completion, perform renaming and documentation work. pub struct ThinTable { - schema: SchemaRef, table_info: TableInfoRef, - table_type: TableType, filter_pushdown: FilterPushDownType, } impl ThinTable { - pub fn new( - schema: SchemaRef, - table_info: TableInfoRef, - table_type: TableType, - filter_pushdown: FilterPushDownType, - ) -> Self { + pub fn new(table_info: TableInfoRef, filter_pushdown: FilterPushDownType) -> Self { Self { - schema, table_info, - table_type, filter_pushdown, } } @@ -69,7 +60,7 @@ impl Table for ThinTableAdapter { } fn schema(&self) -> SchemaRef { - self.table.schema.clone() + self.table.table_info.meta.schema.clone() } fn table_info(&self) -> TableInfoRef { @@ -77,7 +68,7 @@ impl Table for ThinTableAdapter { } fn table_type(&self) -> TableType { - self.table.table_type + self.table.table_info.table_type } async fn scan_to_stream(&self, request: ScanRequest) -> Result { diff --git a/tests-integration/src/grpc.rs b/tests-integration/src/grpc.rs index f23dafa1e4..6b9ba75cec 100644 --- a/tests-integration/src/grpc.rs +++ b/tests-integration/src/grpc.rs @@ -30,12 +30,10 @@ mod test { use common_query::Output; use common_recordbatch::RecordBatches; use frontend::instance::Instance; - use frontend::table::DistTable; use query::parser::QueryLanguageParser; use servers::query_handler::grpc::GrpcQueryHandler; use session::context::QueryContext; use store_api::storage::RegionNumber; - use table::Table; use tests::{has_parquet_file, test_region_dir}; use crate::tests; @@ -332,7 +330,6 @@ CREATE TABLE {table_name} ( .await .unwrap() .unwrap(); - let table = table.as_any().downcast_ref::().unwrap(); let table_id = table.table_info().table_id(); let table_route_value = instance @@ -627,7 +624,6 @@ CREATE TABLE {table_name} ( .await .unwrap() .unwrap(); - let table = table.as_any().downcast_ref::().unwrap(); let table_id = table.table_info().ident.table_id; let table_route_value = instance .table_metadata_manager()