From f8d152231d7270b8d597b33fc5d76cfa0dbe5fc5 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Mon, 7 Aug 2023 16:07:25 +0800 Subject: [PATCH] feat(information_schema): implement `table_factory` method (#2108) * feat(information_schema): implement table_factory method * refactor(catalog): simplify table_factory method * Update src/table/src/data_source.rs Co-authored-by: Ruihang Xia --------- Co-authored-by: Ruihang Xia --- src/catalog/src/information_schema.rs | 27 +++++++++++++++++++++++++ src/catalog/src/lib.rs | 1 + src/catalog/src/table_factory.rs | 19 ++++++++++++++++++ src/table/src/data_source.rs | 29 +++++++++++++++++++++++++++ src/table/src/lib.rs | 1 + 5 files changed, 77 insertions(+) create mode 100644 src/catalog/src/table_factory.rs create mode 100644 src/table/src/data_source.rs diff --git a/src/catalog/src/information_schema.rs b/src/catalog/src/information_schema.rs index 0588675901..056205d5db 100644 --- a/src/catalog/src/information_schema.rs +++ b/src/catalog/src/information_schema.rs @@ -25,6 +25,7 @@ use datatypes::schema::SchemaRef; use futures_util::StreamExt; use snafu::ResultExt; use store_api::storage::ScanRequest; +use table::data_source::DataSource; use table::error::{SchemaConversionSnafu, TablesRecordBatchSnafu}; use table::metadata::TableType; use table::{Result as TableResult, Table, TableRef}; @@ -32,6 +33,7 @@ use table::{Result as TableResult, Table, TableRef}; use self::columns::InformationSchemaColumns; use crate::error::Result; use crate::information_schema::tables::InformationSchemaTables; +use crate::table_factory::TableFactory; use crate::CatalogManager; const TABLES: &str = "tables"; @@ -69,6 +71,25 @@ impl InformationSchemaProvider { Ok(Some(Arc::new(InformationTable::new(stream_builder)))) } + + pub fn table_factory(&self, name: &str) -> Result> { + let stream_builder = match name.to_ascii_lowercase().as_ref() { + TABLES => Arc::new(InformationSchemaTables::new( + self.catalog_name.clone(), + self.catalog_manager.clone(), + )) as _, + COLUMNS => Arc::new(InformationSchemaColumns::new( + self.catalog_name.clone(), + self.catalog_manager.clone(), + )) as _, + _ => { + return Ok(None); + } + }; + let data_source = Arc::new(InformationTable::new(stream_builder)); + + Ok(Some(Arc::new(move || data_source.clone()))) + } } // TODO(ruihang): make it a more generic trait: @@ -108,6 +129,12 @@ impl Table for InformationTable { } async fn scan_to_stream(&self, request: ScanRequest) -> TableResult { + self.get_stream(request) + } +} + +impl DataSource for InformationTable { + fn get_stream(&self, request: ScanRequest) -> TableResult { let projection = request.projection; let projected_schema = if let Some(projection) = &projection { Arc::new( diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index 2f808f697c..02d8dfdd1d 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -37,6 +37,7 @@ pub mod local; mod metrics; pub mod remote; pub mod system; +pub mod table_factory; pub mod table_source; pub mod tables; diff --git a/src/catalog/src/table_factory.rs b/src/catalog/src/table_factory.rs new file mode 100644 index 0000000000..dc34e7e4c0 --- /dev/null +++ b/src/catalog/src/table_factory.rs @@ -0,0 +1,19 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use table::data_source::DataSourceRef; + +pub type TableFactory = Arc DataSourceRef>; diff --git a/src/table/src/data_source.rs b/src/table/src/data_source.rs new file mode 100644 index 0000000000..5c0acd4e49 --- /dev/null +++ b/src/table/src/data_source.rs @@ -0,0 +1,29 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_recordbatch::SendableRecordBatchStream; +use store_api::storage::ScanRequest; + +use crate::error::Result; + +/// This trait represents a common data source abstraction which provides an interface +/// for retrieving data in the form of a stream of record batches. +pub trait DataSource { + /// Retrieves a stream of record batches based on the provided scan request. + fn get_stream(&self, request: ScanRequest) -> Result; +} + +pub type DataSourceRef = Arc; diff --git a/src/table/src/lib.rs b/src/table/src/lib.rs index fa2fb5d5b1..edfdc2af1b 100644 --- a/src/table/src/lib.rs +++ b/src/table/src/lib.rs @@ -13,6 +13,7 @@ // limitations under the License. #![feature(assert_matches)] +pub mod data_source; pub mod engine; pub mod error; pub mod metadata;