diff --git a/src/catalog/src/information_schema.rs b/src/catalog/src/information_schema.rs index 699914c1a2..90f81e4584 100644 --- a/src/catalog/src/information_schema.rs +++ b/src/catalog/src/information_schema.rs @@ -15,25 +15,23 @@ mod columns; mod tables; -use std::any::Any; use std::collections::HashMap; use std::sync::{Arc, Weak}; -use async_trait::async_trait; -use common_catalog::consts::{ - INFORMATION_SCHEMA_COLUMNS_TABLE_ID, INFORMATION_SCHEMA_NAME, - INFORMATION_SCHEMA_TABLES_TABLE_ID, -}; +use common_catalog::consts::INFORMATION_SCHEMA_NAME; use common_error::ext::BoxedError; use common_recordbatch::{RecordBatchStreamAdaptor, SendableRecordBatchStream}; use datatypes::schema::SchemaRef; use futures_util::StreamExt; use snafu::ResultExt; -use store_api::data_source::{DataSource, TableFactory}; +use store_api::data_source::DataSource; use store_api::storage::{ScanRequest, TableId}; use table::error::{SchemaConversionSnafu, TablesRecordBatchSnafu}; -use table::metadata::{TableIdent, TableInfoBuilder, TableMetaBuilder, TableType}; -use table::{Result as TableResult, Table, TableRef}; +use table::metadata::{ + FilterPushDownType, TableIdent, TableInfoBuilder, TableInfoRef, TableMetaBuilder, TableType, +}; +use table::thin_table::{ThinTable, ThinTableAdapter}; +use table::TableRef; use self::columns::InformationSchemaColumns; use crate::error::Result; @@ -62,167 +60,90 @@ impl InformationSchemaProvider { catalog_name: String, catalog_manager: Weak, ) -> HashMap { + let provider = Self::new(catalog_name, catalog_manager); + let mut schema = HashMap::new(); - - schema.insert( - TABLES.to_string(), - Arc::new(InformationTable::new( - catalog_name.clone(), - INFORMATION_SCHEMA_TABLES_TABLE_ID, - TABLES.to_string(), - Arc::new(InformationSchemaTables::new( - catalog_name.clone(), - catalog_manager.clone(), - )), - )) as _, - ); - schema.insert( - COLUMNS.to_string(), - Arc::new(InformationTable::new( - catalog_name.clone(), - INFORMATION_SCHEMA_COLUMNS_TABLE_ID, - COLUMNS.to_string(), - Arc::new(InformationSchemaColumns::new(catalog_name, catalog_manager)), - )) as _, - ); - + schema.insert(TABLES.to_owned(), provider.table(TABLES).unwrap()); + schema.insert(COLUMNS.to_owned(), provider.table(COLUMNS).unwrap()); schema } - pub fn table(&self, name: &str) -> Result> { - let (stream_builder, table_id) = match name.to_ascii_lowercase().as_ref() { - TABLES => ( - Arc::new(InformationSchemaTables::new( - self.catalog_name.clone(), - self.catalog_manager.clone(), - )) as _, - INFORMATION_SCHEMA_TABLES_TABLE_ID, - ), - COLUMNS => ( - Arc::new(InformationSchemaColumns::new( - self.catalog_name.clone(), - self.catalog_manager.clone(), - )) as _, - INFORMATION_SCHEMA_COLUMNS_TABLE_ID, - ), - _ => { - return Ok(None); - } - }; - - Ok(Some(Arc::new(InformationTable::new( - self.catalog_name.clone(), - table_id, - name.to_string(), - stream_builder, - )))) + pub fn table(&self, name: &str) -> Option { + self.information_table(name).map(|table| { + let schema = table.schema(); + let table_info = Self::table_info(self.catalog_name.clone(), &table); + let table_type = table.table_type(); + let data_source = Arc::new(InformationTableDataSource::new(table)); + let filter_pushdown = FilterPushDownType::Unsupported; + let thin_table = ThinTable::new(schema, table_info, table_type, filter_pushdown); + Arc::new(ThinTableAdapter::new(thin_table, data_source)) as _ + }) } - pub fn table_factory(&self, name: &str) -> Result> { - let (stream_builder, table_id) = match name.to_ascii_lowercase().as_ref() { - TABLES => ( - Arc::new(InformationSchemaTables::new( - self.catalog_name.clone(), - self.catalog_manager.clone(), - )) as _, - INFORMATION_SCHEMA_TABLES_TABLE_ID, - ), - COLUMNS => ( - Arc::new(InformationSchemaColumns::new( - self.catalog_name.clone(), - self.catalog_manager.clone(), - )) as _, - INFORMATION_SCHEMA_COLUMNS_TABLE_ID, - ), - _ => { - return Ok(None); - } - }; - let data_source = Arc::new(InformationTable::new( - self.catalog_name.clone(), - table_id, - name.to_string(), - stream_builder, - )); - - Ok(Some(Arc::new(move || data_source.clone()))) - } -} - -// TODO(ruihang): make it a more generic trait: -// https://github.com/GreptimeTeam/greptimedb/pull/1639#discussion_r1205001903 -pub trait InformationStreamBuilder: Send + Sync { - fn to_stream(&self) -> Result; - - fn schema(&self) -> SchemaRef; -} - -pub struct InformationTable { - catalog_name: String, - table_id: TableId, - name: String, - stream_builder: Arc, -} - -impl InformationTable { - pub fn new( - catalog_name: String, - table_id: TableId, - name: String, - stream_builder: Arc, - ) -> Self { - Self { - catalog_name, - table_id, - name, - stream_builder, + fn information_table(&self, name: &str) -> Option { + match name.to_ascii_lowercase().as_str() { + TABLES => Some(Arc::new(InformationSchemaTables::new( + self.catalog_name.clone(), + self.catalog_manager.clone(), + )) as _), + COLUMNS => Some(Arc::new(InformationSchemaColumns::new( + self.catalog_name.clone(), + self.catalog_manager.clone(), + )) as _), + _ => None, } } -} -#[async_trait] -impl Table for InformationTable { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> SchemaRef { - self.stream_builder.schema() - } - - fn table_info(&self) -> table::metadata::TableInfoRef { + fn table_info(catalog_name: String, table: &InformationTableRef) -> TableInfoRef { let table_meta = TableMetaBuilder::default() - .schema(self.stream_builder.schema()) + .schema(table.schema()) .primary_key_indices(vec![]) .next_column_id(0) .build() .unwrap(); - Arc::new( - TableInfoBuilder::default() - .ident(TableIdent { - table_id: self.table_id, - version: 0, - }) - .name(self.name.clone()) - .catalog_name(self.catalog_name.clone()) - .schema_name(INFORMATION_SCHEMA_NAME.to_string()) - .meta(table_meta) - .table_type(TableType::Temporary) - .build() - .unwrap(), - ) + let table_info = TableInfoBuilder::default() + .ident(TableIdent { + table_id: table.table_id(), + version: 0, + }) + .name(table.table_name().to_owned()) + .catalog_name(catalog_name) + .schema_name(INFORMATION_SCHEMA_NAME.to_owned()) + .meta(table_meta) + .table_type(table.table_type()) + .build() + .unwrap(); + Arc::new(table_info) } +} + +trait InformationTable { + fn table_id(&self) -> TableId; + + fn table_name(&self) -> &'static str; + + fn schema(&self) -> SchemaRef; + + fn to_stream(&self) -> Result; fn table_type(&self) -> TableType { TableType::Temporary } +} - async fn scan_to_stream(&self, request: ScanRequest) -> TableResult { - self.get_stream(request).context(TablesRecordBatchSnafu) +type InformationTableRef = Arc; + +struct InformationTableDataSource { + table: InformationTableRef, +} + +impl InformationTableDataSource { + fn new(table: InformationTableRef) -> Self { + Self { table } } } -impl DataSource for InformationTable { +impl DataSource for InformationTableDataSource { fn get_stream( &self, request: ScanRequest, @@ -230,22 +151,23 @@ impl DataSource for InformationTable { let projection = request.projection; let projected_schema = if let Some(projection) = &projection { Arc::new( - self.schema() + self.table + .schema() .try_project(projection) .context(SchemaConversionSnafu) .map_err(BoxedError::new)?, ) } else { - self.schema() + self.table.schema() }; let stream = self - .stream_builder + .table .to_stream() .map_err(BoxedError::new) .context(TablesRecordBatchSnafu) .map_err(BoxedError::new)? .map(move |batch| { - batch.and_then(|batch| { + batch.and_then(|batch: common_recordbatch::RecordBatch| { if let Some(projection) = &projection { batch.try_project(projection) } else { diff --git a/src/catalog/src/information_schema/columns.rs b/src/catalog/src/information_schema/columns.rs index be66119539..53b5efc0f3 100644 --- a/src/catalog/src/information_schema/columns.rs +++ b/src/catalog/src/information_schema/columns.rs @@ -16,8 +16,8 @@ use std::sync::{Arc, Weak}; use arrow_schema::SchemaRef as ArrowSchemaRef; use common_catalog::consts::{ - INFORMATION_SCHEMA_NAME, SEMANTIC_TYPE_FIELD, SEMANTIC_TYPE_PRIMARY_KEY, - SEMANTIC_TYPE_TIME_INDEX, + INFORMATION_SCHEMA_COLUMNS_TABLE_ID, INFORMATION_SCHEMA_NAME, SEMANTIC_TYPE_FIELD, + SEMANTIC_TYPE_PRIMARY_KEY, SEMANTIC_TYPE_TIME_INDEX, }; use common_error::ext::BoxedError; use common_query::physical_plan::TaskContext; @@ -31,9 +31,10 @@ use datatypes::scalars::ScalarVectorBuilder; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; use datatypes::vectors::{StringVectorBuilder, VectorRef}; use snafu::{OptionExt, ResultExt}; +use store_api::storage::TableId; use super::tables::InformationSchemaTables; -use super::{InformationStreamBuilder, COLUMNS, TABLES}; +use super::{InformationTable, COLUMNS, TABLES}; use crate::error::{ CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu, }; @@ -81,7 +82,15 @@ impl InformationSchemaColumns { } } -impl InformationStreamBuilder for InformationSchemaColumns { +impl InformationTable for InformationSchemaColumns { + fn table_id(&self) -> TableId { + INFORMATION_SCHEMA_COLUMNS_TABLE_ID + } + + fn table_name(&self) -> &'static str { + COLUMNS + } + fn schema(&self) -> SchemaRef { self.schema.clone() } diff --git a/src/catalog/src/information_schema/tables.rs b/src/catalog/src/information_schema/tables.rs index 081f2f03cd..9047aa3e59 100644 --- a/src/catalog/src/information_schema/tables.rs +++ b/src/catalog/src/information_schema/tables.rs @@ -30,13 +30,14 @@ use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef}; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder}; use snafu::{OptionExt, ResultExt}; +use store_api::storage::TableId; use table::metadata::TableType; use super::{COLUMNS, TABLES}; use crate::error::{ CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu, }; -use crate::information_schema::InformationStreamBuilder; +use crate::information_schema::InformationTable; use crate::CatalogManager; pub(super) struct InformationSchemaTables { @@ -74,7 +75,15 @@ impl InformationSchemaTables { } } -impl InformationStreamBuilder for InformationSchemaTables { +impl InformationTable for InformationSchemaTables { + fn table_id(&self) -> TableId { + INFORMATION_SCHEMA_TABLES_TABLE_ID + } + + fn table_name(&self) -> &'static str { + TABLES + } + fn schema(&self) -> SchemaRef { self.schema.clone() } diff --git a/src/frontend/src/catalog.rs b/src/frontend/src/catalog.rs index 548ba73627..22157f6665 100644 --- a/src/frontend/src/catalog.rs +++ b/src/frontend/src/catalog.rs @@ -385,7 +385,7 @@ impl CatalogManager for FrontendCatalogManager { let provider = InformationSchemaProvider::new(catalog.to_string(), Arc::downgrade(&manager)); - return provider.table(table_name); + return Ok(provider.table(table_name)); } let key = TableNameKey::new(catalog, schema, table_name); diff --git a/src/store-api/src/data_source.rs b/src/store-api/src/data_source.rs index b178fb41af..3fbef8e08c 100644 --- a/src/store-api/src/data_source.rs +++ b/src/store-api/src/data_source.rs @@ -26,6 +26,4 @@ pub trait DataSource { fn get_stream(&self, request: ScanRequest) -> Result; } -pub type DataSourceRef = Arc; - -pub type TableFactory = Arc DataSourceRef>; +pub type DataSourceRef = Arc; diff --git a/src/table/src/lib.rs b/src/table/src/lib.rs index fa2fb5d5b1..a1e525e5a0 100644 --- a/src/table/src/lib.rs +++ b/src/table/src/lib.rs @@ -21,6 +21,7 @@ pub mod requests; pub mod stats; pub mod table; pub mod test_util; +pub mod thin_table; pub use store_api::storage::RegionStat; diff --git a/src/table/src/metadata.rs b/src/table/src/metadata.rs index 07f43162a8..2892338802 100644 --- a/src/table/src/metadata.rs +++ b/src/table/src/metadata.rs @@ -34,7 +34,7 @@ pub type TableVersion = u64; /// Indicates whether and how a filter expression can be handled by a /// Table for table scans. -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)] pub enum FilterPushDownType { /// The expression cannot be used by the provider. Unsupported, diff --git a/src/table/src/thin_table.rs b/src/table/src/thin_table.rs new file mode 100644 index 0000000000..a22b06cf47 --- /dev/null +++ b/src/table/src/thin_table.rs @@ -0,0 +1,92 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use async_trait::async_trait; +use common_query::prelude::Expr; +use common_recordbatch::SendableRecordBatchStream; +use datatypes::schema::SchemaRef; +use snafu::ResultExt; +use store_api::data_source::DataSourceRef; +use store_api::storage::ScanRequest; + +use crate::error::{Result, TablesRecordBatchSnafu}; +use crate::metadata::{FilterPushDownType, TableInfoRef, TableType}; +use crate::Table; + +/// The `ThinTable` struct will replace the `Table` trait. +/// TODO(zhongzc): After completion, perform renaming and documentation work. +pub struct ThinTable { + schema: SchemaRef, + table_info: TableInfoRef, + table_type: TableType, + filter_pushdown: FilterPushDownType, +} + +impl ThinTable { + pub fn new( + schema: SchemaRef, + table_info: TableInfoRef, + table_type: TableType, + filter_pushdown: FilterPushDownType, + ) -> Self { + Self { + schema, + table_info, + table_type, + filter_pushdown, + } + } +} + +pub struct ThinTableAdapter { + table: ThinTable, + data_source: DataSourceRef, +} + +impl ThinTableAdapter { + pub fn new(table: ThinTable, data_source: DataSourceRef) -> Self { + Self { table, data_source } + } +} + +#[async_trait] +impl Table for ThinTableAdapter { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.table.schema.clone() + } + + fn table_info(&self) -> TableInfoRef { + self.table.table_info.clone() + } + + fn table_type(&self) -> TableType { + self.table.table_type + } + + async fn scan_to_stream(&self, request: ScanRequest) -> Result { + self.data_source + .get_stream(request) + .context(TablesRecordBatchSnafu) + } + + fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result> { + Ok(vec![self.table.filter_pushdown; filters.len()]) + } +}