refactor: add ThinTable to proxy tables from infoschema (#2193)

* refactor: add thin table to proxy tables in info_schema

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix(catalog): fix typo in DataSourceAdapter struct name

* fix: remove redundant Send + Sync

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* refactor(catalog): rename DataSourceAdapter to InformationTableDataSource

* feat(catalog): add ThinTableAdapter for adapting ThinTable to Table interface

* rebase develop

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* refactor: default impl for table_type of InformationTable

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* refactor: filter_pushdown as table field

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: remove explicit type declaration

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2023-08-17 23:19:14 +08:00
committed by GitHub
parent b67e5bbf70
commit 87a730658a
8 changed files with 193 additions and 162 deletions

View File

@@ -15,25 +15,23 @@
mod columns;
mod tables;
use std::any::Any;
use std::collections::HashMap;
use std::sync::{Arc, Weak};
use async_trait::async_trait;
use common_catalog::consts::{
INFORMATION_SCHEMA_COLUMNS_TABLE_ID, INFORMATION_SCHEMA_NAME,
INFORMATION_SCHEMA_TABLES_TABLE_ID,
};
use common_catalog::consts::INFORMATION_SCHEMA_NAME;
use common_error::ext::BoxedError;
use common_recordbatch::{RecordBatchStreamAdaptor, SendableRecordBatchStream};
use datatypes::schema::SchemaRef;
use futures_util::StreamExt;
use snafu::ResultExt;
use store_api::data_source::{DataSource, TableFactory};
use store_api::data_source::DataSource;
use store_api::storage::{ScanRequest, TableId};
use table::error::{SchemaConversionSnafu, TablesRecordBatchSnafu};
use table::metadata::{TableIdent, TableInfoBuilder, TableMetaBuilder, TableType};
use table::{Result as TableResult, Table, TableRef};
use table::metadata::{
FilterPushDownType, TableIdent, TableInfoBuilder, TableInfoRef, TableMetaBuilder, TableType,
};
use table::thin_table::{ThinTable, ThinTableAdapter};
use table::TableRef;
use self::columns::InformationSchemaColumns;
use crate::error::Result;
@@ -62,167 +60,90 @@ impl InformationSchemaProvider {
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
) -> HashMap<String, TableRef> {
let provider = Self::new(catalog_name, catalog_manager);
let mut schema = HashMap::new();
schema.insert(
TABLES.to_string(),
Arc::new(InformationTable::new(
catalog_name.clone(),
INFORMATION_SCHEMA_TABLES_TABLE_ID,
TABLES.to_string(),
Arc::new(InformationSchemaTables::new(
catalog_name.clone(),
catalog_manager.clone(),
)),
)) as _,
);
schema.insert(
COLUMNS.to_string(),
Arc::new(InformationTable::new(
catalog_name.clone(),
INFORMATION_SCHEMA_COLUMNS_TABLE_ID,
COLUMNS.to_string(),
Arc::new(InformationSchemaColumns::new(catalog_name, catalog_manager)),
)) as _,
);
schema.insert(TABLES.to_owned(), provider.table(TABLES).unwrap());
schema.insert(COLUMNS.to_owned(), provider.table(COLUMNS).unwrap());
schema
}
pub fn table(&self, name: &str) -> Result<Option<TableRef>> {
let (stream_builder, table_id) = match name.to_ascii_lowercase().as_ref() {
TABLES => (
Arc::new(InformationSchemaTables::new(
self.catalog_name.clone(),
self.catalog_manager.clone(),
)) as _,
INFORMATION_SCHEMA_TABLES_TABLE_ID,
),
COLUMNS => (
Arc::new(InformationSchemaColumns::new(
self.catalog_name.clone(),
self.catalog_manager.clone(),
)) as _,
INFORMATION_SCHEMA_COLUMNS_TABLE_ID,
),
_ => {
return Ok(None);
}
};
Ok(Some(Arc::new(InformationTable::new(
self.catalog_name.clone(),
table_id,
name.to_string(),
stream_builder,
))))
pub fn table(&self, name: &str) -> Option<TableRef> {
self.information_table(name).map(|table| {
let schema = table.schema();
let table_info = Self::table_info(self.catalog_name.clone(), &table);
let table_type = table.table_type();
let data_source = Arc::new(InformationTableDataSource::new(table));
let filter_pushdown = FilterPushDownType::Unsupported;
let thin_table = ThinTable::new(schema, table_info, table_type, filter_pushdown);
Arc::new(ThinTableAdapter::new(thin_table, data_source)) as _
})
}
pub fn table_factory(&self, name: &str) -> Result<Option<TableFactory>> {
let (stream_builder, table_id) = match name.to_ascii_lowercase().as_ref() {
TABLES => (
Arc::new(InformationSchemaTables::new(
self.catalog_name.clone(),
self.catalog_manager.clone(),
)) as _,
INFORMATION_SCHEMA_TABLES_TABLE_ID,
),
COLUMNS => (
Arc::new(InformationSchemaColumns::new(
self.catalog_name.clone(),
self.catalog_manager.clone(),
)) as _,
INFORMATION_SCHEMA_COLUMNS_TABLE_ID,
),
_ => {
return Ok(None);
}
};
let data_source = Arc::new(InformationTable::new(
self.catalog_name.clone(),
table_id,
name.to_string(),
stream_builder,
));
Ok(Some(Arc::new(move || data_source.clone())))
}
}
// TODO(ruihang): make it a more generic trait:
// https://github.com/GreptimeTeam/greptimedb/pull/1639#discussion_r1205001903
pub trait InformationStreamBuilder: Send + Sync {
fn to_stream(&self) -> Result<SendableRecordBatchStream>;
fn schema(&self) -> SchemaRef;
}
pub struct InformationTable {
catalog_name: String,
table_id: TableId,
name: String,
stream_builder: Arc<dyn InformationStreamBuilder>,
}
impl InformationTable {
pub fn new(
catalog_name: String,
table_id: TableId,
name: String,
stream_builder: Arc<dyn InformationStreamBuilder>,
) -> Self {
Self {
catalog_name,
table_id,
name,
stream_builder,
fn information_table(&self, name: &str) -> Option<InformationTableRef> {
match name.to_ascii_lowercase().as_str() {
TABLES => Some(Arc::new(InformationSchemaTables::new(
self.catalog_name.clone(),
self.catalog_manager.clone(),
)) as _),
COLUMNS => Some(Arc::new(InformationSchemaColumns::new(
self.catalog_name.clone(),
self.catalog_manager.clone(),
)) as _),
_ => None,
}
}
}
#[async_trait]
impl Table for InformationTable {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.stream_builder.schema()
}
fn table_info(&self) -> table::metadata::TableInfoRef {
fn table_info(catalog_name: String, table: &InformationTableRef) -> TableInfoRef {
let table_meta = TableMetaBuilder::default()
.schema(self.stream_builder.schema())
.schema(table.schema())
.primary_key_indices(vec![])
.next_column_id(0)
.build()
.unwrap();
Arc::new(
TableInfoBuilder::default()
.ident(TableIdent {
table_id: self.table_id,
version: 0,
})
.name(self.name.clone())
.catalog_name(self.catalog_name.clone())
.schema_name(INFORMATION_SCHEMA_NAME.to_string())
.meta(table_meta)
.table_type(TableType::Temporary)
.build()
.unwrap(),
)
let table_info = TableInfoBuilder::default()
.ident(TableIdent {
table_id: table.table_id(),
version: 0,
})
.name(table.table_name().to_owned())
.catalog_name(catalog_name)
.schema_name(INFORMATION_SCHEMA_NAME.to_owned())
.meta(table_meta)
.table_type(table.table_type())
.build()
.unwrap();
Arc::new(table_info)
}
}
trait InformationTable {
fn table_id(&self) -> TableId;
fn table_name(&self) -> &'static str;
fn schema(&self) -> SchemaRef;
fn to_stream(&self) -> Result<SendableRecordBatchStream>;
fn table_type(&self) -> TableType {
TableType::Temporary
}
}
async fn scan_to_stream(&self, request: ScanRequest) -> TableResult<SendableRecordBatchStream> {
self.get_stream(request).context(TablesRecordBatchSnafu)
type InformationTableRef = Arc<dyn InformationTable + Send + Sync>;
struct InformationTableDataSource {
table: InformationTableRef,
}
impl InformationTableDataSource {
fn new(table: InformationTableRef) -> Self {
Self { table }
}
}
impl DataSource for InformationTable {
impl DataSource for InformationTableDataSource {
fn get_stream(
&self,
request: ScanRequest,
@@ -230,22 +151,23 @@ impl DataSource for InformationTable {
let projection = request.projection;
let projected_schema = if let Some(projection) = &projection {
Arc::new(
self.schema()
self.table
.schema()
.try_project(projection)
.context(SchemaConversionSnafu)
.map_err(BoxedError::new)?,
)
} else {
self.schema()
self.table.schema()
};
let stream = self
.stream_builder
.table
.to_stream()
.map_err(BoxedError::new)
.context(TablesRecordBatchSnafu)
.map_err(BoxedError::new)?
.map(move |batch| {
batch.and_then(|batch| {
batch.and_then(|batch: common_recordbatch::RecordBatch| {
if let Some(projection) = &projection {
batch.try_project(projection)
} else {

View File

@@ -16,8 +16,8 @@ use std::sync::{Arc, Weak};
use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::{
INFORMATION_SCHEMA_NAME, SEMANTIC_TYPE_FIELD, SEMANTIC_TYPE_PRIMARY_KEY,
SEMANTIC_TYPE_TIME_INDEX,
INFORMATION_SCHEMA_COLUMNS_TABLE_ID, INFORMATION_SCHEMA_NAME, SEMANTIC_TYPE_FIELD,
SEMANTIC_TYPE_PRIMARY_KEY, SEMANTIC_TYPE_TIME_INDEX,
};
use common_error::ext::BoxedError;
use common_query::physical_plan::TaskContext;
@@ -31,9 +31,10 @@ use datatypes::scalars::ScalarVectorBuilder;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::vectors::{StringVectorBuilder, VectorRef};
use snafu::{OptionExt, ResultExt};
use store_api::storage::TableId;
use super::tables::InformationSchemaTables;
use super::{InformationStreamBuilder, COLUMNS, TABLES};
use super::{InformationTable, COLUMNS, TABLES};
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
};
@@ -81,7 +82,15 @@ impl InformationSchemaColumns {
}
}
impl InformationStreamBuilder for InformationSchemaColumns {
impl InformationTable for InformationSchemaColumns {
fn table_id(&self) -> TableId {
INFORMATION_SCHEMA_COLUMNS_TABLE_ID
}
fn table_name(&self) -> &'static str {
COLUMNS
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}

View File

@@ -30,13 +30,14 @@ use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder};
use snafu::{OptionExt, ResultExt};
use store_api::storage::TableId;
use table::metadata::TableType;
use super::{COLUMNS, TABLES};
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
};
use crate::information_schema::InformationStreamBuilder;
use crate::information_schema::InformationTable;
use crate::CatalogManager;
pub(super) struct InformationSchemaTables {
@@ -74,7 +75,15 @@ impl InformationSchemaTables {
}
}
impl InformationStreamBuilder for InformationSchemaTables {
impl InformationTable for InformationSchemaTables {
fn table_id(&self) -> TableId {
INFORMATION_SCHEMA_TABLES_TABLE_ID
}
fn table_name(&self) -> &'static str {
TABLES
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}

View File

@@ -385,7 +385,7 @@ impl CatalogManager for FrontendCatalogManager {
let provider =
InformationSchemaProvider::new(catalog.to_string(), Arc::downgrade(&manager));
return provider.table(table_name);
return Ok(provider.table(table_name));
}
let key = TableNameKey::new(catalog, schema, table_name);

View File

@@ -26,6 +26,4 @@ pub trait DataSource {
fn get_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream, BoxedError>;
}
pub type DataSourceRef = Arc<dyn DataSource>;
pub type TableFactory = Arc<dyn Fn() -> DataSourceRef>;
pub type DataSourceRef = Arc<dyn DataSource + Send + Sync>;

View File

@@ -21,6 +21,7 @@ pub mod requests;
pub mod stats;
pub mod table;
pub mod test_util;
pub mod thin_table;
pub use store_api::storage::RegionStat;

View File

@@ -34,7 +34,7 @@ pub type TableVersion = u64;
/// Indicates whether and how a filter expression can be handled by a
/// Table for table scans.
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
pub enum FilterPushDownType {
/// The expression cannot be used by the provider.
Unsupported,

View File

@@ -0,0 +1,92 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use async_trait::async_trait;
use common_query::prelude::Expr;
use common_recordbatch::SendableRecordBatchStream;
use datatypes::schema::SchemaRef;
use snafu::ResultExt;
use store_api::data_source::DataSourceRef;
use store_api::storage::ScanRequest;
use crate::error::{Result, TablesRecordBatchSnafu};
use crate::metadata::{FilterPushDownType, TableInfoRef, TableType};
use crate::Table;
/// The `ThinTable` struct will replace the `Table` trait.
/// TODO(zhongzc): After completion, perform renaming and documentation work.
pub struct ThinTable {
schema: SchemaRef,
table_info: TableInfoRef,
table_type: TableType,
filter_pushdown: FilterPushDownType,
}
impl ThinTable {
pub fn new(
schema: SchemaRef,
table_info: TableInfoRef,
table_type: TableType,
filter_pushdown: FilterPushDownType,
) -> Self {
Self {
schema,
table_info,
table_type,
filter_pushdown,
}
}
}
pub struct ThinTableAdapter {
table: ThinTable,
data_source: DataSourceRef,
}
impl ThinTableAdapter {
pub fn new(table: ThinTable, data_source: DataSourceRef) -> Self {
Self { table, data_source }
}
}
#[async_trait]
impl Table for ThinTableAdapter {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.table.schema.clone()
}
fn table_info(&self) -> TableInfoRef {
self.table.table_info.clone()
}
fn table_type(&self) -> TableType {
self.table.table_type
}
async fn scan_to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
self.data_source
.get_stream(request)
.context(TablesRecordBatchSnafu)
}
fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result<Vec<FilterPushDownType>> {
Ok(vec![self.table.filter_pushdown; filters.len()])
}
}