feat(information_schema): implement table_factory method (#2108)

* feat(information_schema): implement table_factory method

* refactor(catalog): simplify table_factory method

* Update src/table/src/data_source.rs

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>

---------

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Zhenchi
2023-08-07 16:07:25 +08:00
committed by GitHub
parent c8cb1ef5bc
commit f8d152231d
5 changed files with 77 additions and 0 deletions

View File

@@ -25,6 +25,7 @@ use datatypes::schema::SchemaRef;
use futures_util::StreamExt;
use snafu::ResultExt;
use store_api::storage::ScanRequest;
use table::data_source::DataSource;
use table::error::{SchemaConversionSnafu, TablesRecordBatchSnafu};
use table::metadata::TableType;
use table::{Result as TableResult, Table, TableRef};
@@ -32,6 +33,7 @@ use table::{Result as TableResult, Table, TableRef};
use self::columns::InformationSchemaColumns;
use crate::error::Result;
use crate::information_schema::tables::InformationSchemaTables;
use crate::table_factory::TableFactory;
use crate::CatalogManager;
const TABLES: &str = "tables";
@@ -69,6 +71,25 @@ impl InformationSchemaProvider {
Ok(Some(Arc::new(InformationTable::new(stream_builder))))
}
pub fn table_factory(&self, name: &str) -> Result<Option<TableFactory>> {
let stream_builder = match name.to_ascii_lowercase().as_ref() {
TABLES => Arc::new(InformationSchemaTables::new(
self.catalog_name.clone(),
self.catalog_manager.clone(),
)) as _,
COLUMNS => Arc::new(InformationSchemaColumns::new(
self.catalog_name.clone(),
self.catalog_manager.clone(),
)) as _,
_ => {
return Ok(None);
}
};
let data_source = Arc::new(InformationTable::new(stream_builder));
Ok(Some(Arc::new(move || data_source.clone())))
}
}
// TODO(ruihang): make it a more generic trait:
@@ -108,6 +129,12 @@ impl Table for InformationTable {
}
async fn scan_to_stream(&self, request: ScanRequest) -> TableResult<SendableRecordBatchStream> {
self.get_stream(request)
}
}
impl DataSource for InformationTable {
fn get_stream(&self, request: ScanRequest) -> TableResult<SendableRecordBatchStream> {
let projection = request.projection;
let projected_schema = if let Some(projection) = &projection {
Arc::new(

View File

@@ -37,6 +37,7 @@ pub mod local;
mod metrics;
pub mod remote;
pub mod system;
pub mod table_factory;
pub mod table_source;
pub mod tables;

View File

@@ -0,0 +1,19 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use table::data_source::DataSourceRef;
pub type TableFactory = Arc<dyn Fn() -> DataSourceRef>;

View File

@@ -0,0 +1,29 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_recordbatch::SendableRecordBatchStream;
use store_api::storage::ScanRequest;
use crate::error::Result;
/// This trait represents a common data source abstraction which provides an interface
/// for retrieving data in the form of a stream of record batches.
pub trait DataSource {
/// Retrieves a stream of record batches based on the provided scan request.
fn get_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream>;
}
pub type DataSourceRef = Arc<dyn DataSource>;

View File

@@ -13,6 +13,7 @@
// limitations under the License.
#![feature(assert_matches)]
pub mod data_source;
pub mod engine;
pub mod error;
pub mod metadata;