refactor(table): cleanup dist table (#2255)

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2023-08-25 14:37:39 +08:00
committed by GitHub
parent 6bf260a05c
commit 9d87c8b6de
8 changed files with 31 additions and 348 deletions

View File

@@ -70,11 +70,9 @@ impl InformationSchemaProvider {
pub fn table(&self, name: &str) -> Option<TableRef> { pub fn table(&self, name: &str) -> Option<TableRef> {
self.information_table(name).map(|table| { self.information_table(name).map(|table| {
let schema = table.schema();
let table_info = Self::table_info(self.catalog_name.clone(), &table); let table_info = Self::table_info(self.catalog_name.clone(), &table);
let table_type = table.table_type();
let filter_pushdown = FilterPushDownType::Unsupported; let filter_pushdown = FilterPushDownType::Unsupported;
let thin_table = ThinTable::new(schema, table_info, table_type, filter_pushdown); let thin_table = ThinTable::new(table_info, filter_pushdown);
let data_source = Arc::new(InformationTableDataSource::new(table)); let data_source = Arc::new(InformationTableDataSource::new(table));
Arc::new(ThinTableAdapter::new(thin_table, data_source)) as _ Arc::new(ThinTableAdapter::new(thin_table, data_source)) as _

View File

@@ -38,7 +38,6 @@ use common_meta::key::table_info::TableInfoKey;
use common_meta::key::table_name::TableNameKey; use common_meta::key::table_name::TableNameKey;
use common_meta::key::{TableMetaKey, TableMetadataManagerRef}; use common_meta::key::{TableMetaKey, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef; use common_meta::kv_backend::KvBackendRef;
use common_meta::table_name::TableName;
use common_telemetry::debug; use common_telemetry::debug;
use futures_util::TryStreamExt; use futures_util::TryStreamExt;
use partition::manager::PartitionRuleManagerRef; use partition::manager::PartitionRuleManagerRef;
@@ -417,12 +416,7 @@ impl CatalogManager for FrontendCatalogManager {
.try_into() .try_into()
.context(catalog_err::InvalidTableInfoInCatalogSnafu)?, .context(catalog_err::InvalidTableInfoInCatalogSnafu)?,
); );
let table = Arc::new(DistTable::new( Ok(Some(DistTable::table(table_info)))
TableName::new(catalog, schema, table_name),
table_info,
Arc::new(self.clone()),
));
Ok(Some(table))
} }
fn as_any(&self) -> &dyn Any { fn as_any(&self) -> &dyn Any {

View File

@@ -129,11 +129,7 @@ impl DistInstance {
create_table.table_id = Some(api::v1::TableId { id: table_id }); create_table.table_id = Some(api::v1::TableId { id: table_id });
let table = Arc::new(DistTable::new( let table = DistTable::table(table_info);
table_name.clone(),
table_info,
self.catalog_manager.clone(),
));
let request = RegisterTableRequest { let request = RegisterTableRequest {
catalog: table_name.catalog_name.clone(), catalog: table_name.catalog_name.clone(),

View File

@@ -12,228 +12,41 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::any::Any;
use std::sync::Arc; use std::sync::Arc;
use async_trait::async_trait;
use client::Database;
use common_error::ext::BoxedError; use common_error::ext::BoxedError;
use common_meta::table_name::TableName; use common_recordbatch::SendableRecordBatchStream;
use common_query::error::Result as QueryResult; use store_api::data_source::DataSource;
use common_query::logical_plan::Expr;
use common_query::physical_plan::{PhysicalPlan, PhysicalPlanRef};
use common_query::Output;
use common_recordbatch::adapter::AsyncRecordBatchStreamAdapter;
use common_recordbatch::error::{
ExternalSnafu as RecordBatchExternalSnafu, Result as RecordBatchResult,
};
use common_recordbatch::{RecordBatchStreamAdaptor, SendableRecordBatchStream};
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::Partitioning;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use futures_util::StreamExt;
use snafu::prelude::*;
use store_api::storage::ScanRequest; use store_api::storage::ScanRequest;
use table::error::TableOperationSnafu; use table::metadata::{FilterPushDownType, TableInfoRef};
use table::metadata::{FilterPushDownType, TableInfoRef, TableType}; use table::thin_table::{ThinTable, ThinTableAdapter};
use table::Table; use table::TableRef;
use crate::catalog::FrontendCatalogManager; use crate::error::NotSupportedSnafu;
use crate::table::scan::{DatanodeInstance, TableScanPlan};
pub mod delete; pub mod delete;
pub mod insert; pub mod insert;
pub(crate) mod scan;
#[derive(Clone)] #[derive(Clone)]
pub struct DistTable { pub struct DistTable;
table_name: TableName,
table_info: TableInfoRef,
catalog_manager: Arc<FrontendCatalogManager>,
}
#[async_trait]
impl Table for DistTable {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.table_info.meta.schema.clone()
}
fn table_info(&self) -> TableInfoRef {
self.table_info.clone()
}
fn table_type(&self) -> TableType {
self.table_info.table_type
}
// TODO(ruihang): DistTable should not call this method directly
async fn scan_to_stream(
&self,
request: ScanRequest,
) -> table::Result<SendableRecordBatchStream> {
let partition_manager = self.catalog_manager.partition_manager();
let datanode_clients = self.catalog_manager.datanode_clients();
let table_id = self.table_info.table_id();
let partition_rule = partition_manager
.find_table_partition_rule(table_id)
.await
.map_err(BoxedError::new)
.context(TableOperationSnafu)?;
let regions = partition_manager
.find_regions_by_filters(partition_rule, &request.filters)
.map_err(BoxedError::new)
.context(TableOperationSnafu)?;
let datanodes = partition_manager
.find_region_datanodes(table_id, regions)
.await
.map_err(BoxedError::new)
.context(TableOperationSnafu)?;
let table_name = &self.table_name;
let mut partition_execs = Vec::with_capacity(datanodes.len());
for (datanode, _regions) in datanodes.iter() {
let client = datanode_clients.get_client(datanode).await;
let db = Database::new(&table_name.catalog_name, &table_name.schema_name, client);
let datanode_instance = DatanodeInstance::new(Arc::new(self.clone()) as _, db);
partition_execs.push(Arc::new(PartitionExec {
table_name: table_name.clone(),
datanode_instance,
projection: request.projection.clone(),
filters: request.filters.clone(),
limit: request.limit,
}));
}
let stream = Box::pin(async_stream::stream!({
for partition_exec in partition_execs {
let mut stream = partition_exec.scan_to_stream().await?;
while let Some(record_batch) = stream.next().await {
yield record_batch;
}
}
}));
let schema = project_schema(self.schema(), request.projection.as_ref());
let stream = RecordBatchStreamAdaptor {
schema,
stream,
output_ordering: None,
};
Ok(Box::pin(stream))
}
fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> table::Result<Vec<FilterPushDownType>> {
Ok(vec![FilterPushDownType::Inexact; filters.len()])
}
}
impl DistTable { impl DistTable {
pub fn new( pub fn table(table_info: TableInfoRef) -> TableRef {
table_name: TableName, let thin_table = ThinTable::new(table_info, FilterPushDownType::Inexact);
table_info: TableInfoRef, let data_source = Arc::new(DummyDataSource);
catalog_manager: Arc<FrontendCatalogManager>, Arc::new(ThinTableAdapter::new(thin_table, data_source))
) -> Self { }
Self { }
table_name,
table_info, pub struct DummyDataSource;
catalog_manager,
impl DataSource for DummyDataSource {
fn get_stream(&self, _request: ScanRequest) -> Result<SendableRecordBatchStream, BoxedError> {
NotSupportedSnafu {
feat: "get stream from a distributed table",
} }
} .fail()
} .map_err(BoxedError::new)
fn project_schema(table_schema: SchemaRef, projection: Option<&Vec<usize>>) -> SchemaRef {
if let Some(projection) = projection {
let columns = table_schema.column_schemas();
let projected = projection
.iter()
.map(|x| columns[*x].clone())
.collect::<Vec<ColumnSchema>>();
Arc::new(Schema::new(projected))
} else {
table_schema
}
}
#[derive(Debug)]
struct DistTableScan {
schema: SchemaRef,
partition_execs: Vec<Arc<PartitionExec>>,
}
impl PhysicalPlan for DistTableScan {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(self.partition_execs.len())
}
fn children(&self) -> Vec<PhysicalPlanRef> {
vec![]
}
fn with_new_children(&self, _children: Vec<PhysicalPlanRef>) -> QueryResult<PhysicalPlanRef> {
unimplemented!()
}
fn execute(
&self,
partition: usize,
_context: Arc<TaskContext>,
) -> QueryResult<SendableRecordBatchStream> {
let exec = self.partition_execs[partition].clone();
let stream = Box::pin(async move { exec.scan_to_stream().await });
let stream = AsyncRecordBatchStreamAdapter::new(self.schema(), stream);
Ok(Box::pin(stream))
}
}
#[derive(Debug)]
struct PartitionExec {
table_name: TableName,
datanode_instance: DatanodeInstance,
projection: Option<Vec<usize>>,
filters: Vec<Expr>,
limit: Option<usize>,
}
impl PartitionExec {
async fn scan_to_stream(&self) -> RecordBatchResult<SendableRecordBatchStream> {
let plan: TableScanPlan = TableScanPlan {
table_name: self.table_name.clone(),
projection: self.projection.clone(),
filters: self.filters.clone(),
limit: self.limit,
};
let output = self
.datanode_instance
.grpc_table_scan(plan)
.await
.map_err(BoxedError::new)
.context(RecordBatchExternalSnafu)?;
let Output::Stream(stream) = output else {
unreachable!()
};
Ok(stream)
} }
} }
@@ -245,6 +58,8 @@ pub(crate) mod test {
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_meta::peer::Peer; use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute, Table, TableRoute}; use common_meta::rpc::router::{Region, RegionRoute, Table, TableRoute};
use common_meta::table_name::TableName;
use common_query::prelude::Expr;
use datafusion_expr::expr_fn::{and, binary_expr, col, or}; use datafusion_expr::expr_fn::{and, binary_expr, col, or};
use datafusion_expr::{lit, Operator}; use datafusion_expr::{lit, Operator};
use meta_client::client::MetaClient; use meta_client::client::MetaClient;

View File

@@ -1,104 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Formatter;
use std::sync::Arc;
use client::Database;
use common_meta::table_name::TableName;
use common_query::prelude::Expr;
use common_query::Output;
use datafusion::datasource::DefaultTableSource;
use datafusion_expr::{LogicalPlan, LogicalPlanBuilder};
use snafu::ResultExt;
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use table::table::adapter::DfTableProviderAdapter;
use table::TableRef;
use crate::error::{self, Result};
#[derive(Clone)]
pub struct DatanodeInstance {
table: TableRef,
db: Database,
}
impl std::fmt::Debug for DatanodeInstance {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str("DatanodeInstance")
}
}
impl DatanodeInstance {
pub(crate) fn new(table: TableRef, db: Database) -> Self {
Self { table, db }
}
pub(crate) async fn grpc_table_scan(&self, plan: TableScanPlan) -> Result<Output> {
let logical_plan = self.build_logical_plan(&plan)?;
let substrait_plan = DFLogicalSubstraitConvertor
.encode(&logical_plan)
.context(error::EncodeSubstraitLogicalPlanSnafu)?;
self.db
.logical_plan(substrait_plan.to_vec(), None)
.await
.context(error::RequestDatanodeSnafu)
}
fn build_logical_plan(&self, table_scan: &TableScanPlan) -> Result<LogicalPlan> {
let table_provider = Arc::new(DfTableProviderAdapter::new(self.table.clone()));
let mut builder = LogicalPlanBuilder::scan_with_filters(
table_scan.table_name.to_string(),
Arc::new(DefaultTableSource::new(table_provider)),
table_scan.projection.clone(),
table_scan
.filters
.iter()
.map(|x| x.df_expr().clone())
.collect::<Vec<_>>(),
)
.context(error::BuildDfLogicalPlanSnafu)?;
if let Some(filter) = table_scan
.filters
.iter()
.map(|x| x.df_expr())
.cloned()
.reduce(|accum, expr| accum.and(expr))
{
builder = builder
.filter(filter)
.context(error::BuildDfLogicalPlanSnafu)?;
}
if table_scan.limit.is_some() {
builder = builder
.limit(0, table_scan.limit)
.context(error::BuildDfLogicalPlanSnafu)?;
}
builder.build().context(error::BuildDfLogicalPlanSnafu)
}
}
#[derive(Debug)]
pub(crate) struct TableScanPlan {
pub table_name: TableName,
pub projection: Option<Vec<usize>>,
pub filters: Vec<Expr>,
pub limit: Option<usize>,
}

View File

@@ -48,14 +48,11 @@ impl NumbersTable {
} }
pub fn table_with_name(table_id: TableId, name: String) -> TableRef { pub fn table_with_name(table_id: TableId, name: String) -> TableRef {
let schema = Self::schema();
let thin_table = ThinTable::new( let thin_table = ThinTable::new(
schema.clone(),
Self::table_info(table_id, name, "test_engine".to_string()), Self::table_info(table_id, name, "test_engine".to_string()),
TableType::Temporary,
FilterPushDownType::Unsupported, FilterPushDownType::Unsupported,
); );
let data_source = Arc::new(NumbersDataSource::new(schema)); let data_source = Arc::new(NumbersDataSource::new(Self::schema()));
Arc::new(ThinTableAdapter::new(thin_table, data_source)) Arc::new(ThinTableAdapter::new(thin_table, data_source))
} }

View File

@@ -29,23 +29,14 @@ use crate::Table;
/// The `ThinTable` struct will replace the `Table` trait. /// The `ThinTable` struct will replace the `Table` trait.
/// TODO(zhongzc): After completion, perform renaming and documentation work. /// TODO(zhongzc): After completion, perform renaming and documentation work.
pub struct ThinTable { pub struct ThinTable {
schema: SchemaRef,
table_info: TableInfoRef, table_info: TableInfoRef,
table_type: TableType,
filter_pushdown: FilterPushDownType, filter_pushdown: FilterPushDownType,
} }
impl ThinTable { impl ThinTable {
pub fn new( pub fn new(table_info: TableInfoRef, filter_pushdown: FilterPushDownType) -> Self {
schema: SchemaRef,
table_info: TableInfoRef,
table_type: TableType,
filter_pushdown: FilterPushDownType,
) -> Self {
Self { Self {
schema,
table_info, table_info,
table_type,
filter_pushdown, filter_pushdown,
} }
} }
@@ -69,7 +60,7 @@ impl Table for ThinTableAdapter {
} }
fn schema(&self) -> SchemaRef { fn schema(&self) -> SchemaRef {
self.table.schema.clone() self.table.table_info.meta.schema.clone()
} }
fn table_info(&self) -> TableInfoRef { fn table_info(&self) -> TableInfoRef {
@@ -77,7 +68,7 @@ impl Table for ThinTableAdapter {
} }
fn table_type(&self) -> TableType { fn table_type(&self) -> TableType {
self.table.table_type self.table.table_info.table_type
} }
async fn scan_to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> { async fn scan_to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {

View File

@@ -30,12 +30,10 @@ mod test {
use common_query::Output; use common_query::Output;
use common_recordbatch::RecordBatches; use common_recordbatch::RecordBatches;
use frontend::instance::Instance; use frontend::instance::Instance;
use frontend::table::DistTable;
use query::parser::QueryLanguageParser; use query::parser::QueryLanguageParser;
use servers::query_handler::grpc::GrpcQueryHandler; use servers::query_handler::grpc::GrpcQueryHandler;
use session::context::QueryContext; use session::context::QueryContext;
use store_api::storage::RegionNumber; use store_api::storage::RegionNumber;
use table::Table;
use tests::{has_parquet_file, test_region_dir}; use tests::{has_parquet_file, test_region_dir};
use crate::tests; use crate::tests;
@@ -332,7 +330,6 @@ CREATE TABLE {table_name} (
.await .await
.unwrap() .unwrap()
.unwrap(); .unwrap();
let table = table.as_any().downcast_ref::<DistTable>().unwrap();
let table_id = table.table_info().table_id(); let table_id = table.table_info().table_id();
let table_route_value = instance let table_route_value = instance
@@ -627,7 +624,6 @@ CREATE TABLE {table_name} (
.await .await
.unwrap() .unwrap()
.unwrap(); .unwrap();
let table = table.as_any().downcast_ref::<DistTable>().unwrap();
let table_id = table.table_info().ident.table_id; let table_id = table.table_info().ident.table_id;
let table_route_value = instance let table_route_value = instance
.table_metadata_manager() .table_metadata_manager()