From d6c82867d52e295e0370071ddb8098e906d3b416 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Fri, 1 Sep 2023 11:34:03 +0800 Subject: [PATCH] refactor: remove the most Table impls (#2274) Signed-off-by: Zhenchi Co-authored-by: Ruihang Xia --- src/catalog/src/remote/mock.rs | 4 +- src/catalog/src/remote/region_alive_keeper.rs | 4 +- src/catalog/tests/remote_catalog_tests.rs | 8 +- src/file-table-engine/src/engine/immutable.rs | 8 +- src/file-table-engine/src/engine/tests.rs | 8 +- src/file-table-engine/src/table/immutable.rs | 140 ++++++++++-------- src/promql/src/planner.rs | 2 +- src/query/src/range_select/plan_rewrite.rs | 2 +- src/query/src/sql.rs | 2 +- src/query/src/tests.rs | 7 +- src/query/src/tests/function.rs | 2 +- src/query/src/tests/my_sum_udaf_example.rs | 2 +- src/query/src/tests/percentile_test.rs | 2 +- src/query/src/tests/query_engine_test.rs | 2 +- src/query/src/tests/time_range_filter_test.rs | 89 +++++------ src/servers/src/prom_store.rs | 2 +- src/servers/tests/grpc/mod.rs | 3 +- src/servers/tests/mod.rs | 11 +- src/servers/tests/mysql/mysql_server_test.rs | 13 +- src/servers/tests/postgres/mod.rs | 3 +- src/table/src/test_util/empty_table.rs | 65 ++++---- src/table/src/test_util/memtable.rs | 100 +++++-------- src/table/src/test_util/mock_engine.rs | 3 +- src/table/src/thin_table.rs | 4 + 24 files changed, 219 insertions(+), 267 deletions(-) diff --git a/src/catalog/src/remote/mock.rs b/src/catalog/src/remote/mock.rs index 2f98ea018b..4e88f54ce4 100644 --- a/src/catalog/src/remote/mock.rs +++ b/src/catalog/src/remote/mock.rs @@ -55,14 +55,14 @@ impl TableEngine for MockTableEngine { let data = vec![Arc::new(StringVector::from(vec!["a", "b", "c"])) as _]; let record_batch = RecordBatch::new(schema, data).unwrap(); - let table: TableRef = Arc::new(MemTable::new_with_catalog( + let table = MemTable::new_with_catalog( &request.table_name, record_batch, table_id, request.catalog_name, request.schema_name, vec![0], - )) as Arc<_>; + ); let mut tables = self.tables.write().unwrap(); let _ = tables.insert(table_id, table.clone() as TableRef); diff --git a/src/catalog/src/remote/region_alive_keeper.rs b/src/catalog/src/remote/region_alive_keeper.rs index f643487ec7..dff500b0f1 100644 --- a/src/catalog/src/remote/region_alive_keeper.rs +++ b/src/catalog/src/remote/region_alive_keeper.rs @@ -561,7 +561,7 @@ mod test { table_id: 1, engine: "MockTableEngine".to_string(), }; - let table = Arc::new(EmptyTable::new(CreateTableRequest { + let table = EmptyTable::table(CreateTableRequest { id: 1, catalog_name: catalog.to_string(), schema_name: schema.to_string(), @@ -577,7 +577,7 @@ mod test { create_if_not_exists: false, table_options: TableOptions::default(), engine: "MockTableEngine".to_string(), - })); + }); let catalog_manager = MemoryCatalogManager::new_with_table(table.clone()); keepers .register_table(table_ident.clone(), table, catalog_manager) diff --git a/src/catalog/tests/remote_catalog_tests.rs b/src/catalog/tests/remote_catalog_tests.rs index a6ad6761e2..d7d943c467 100644 --- a/src/catalog/tests/remote_catalog_tests.rs +++ b/src/catalog/tests/remote_catalog_tests.rs @@ -379,7 +379,7 @@ mod tests { schema: table_before.schema.clone(), table_name: table_before.table.clone(), table_id: table_before.table_id, - table: Arc::new(EmptyTable::new(CreateTableRequest { + table: EmptyTable::table(CreateTableRequest { id: table_before.table_id, catalog_name: table_before.catalog.clone(), schema_name: table_before.schema.clone(), @@ -391,7 +391,7 @@ mod tests { create_if_not_exists: false, table_options: Default::default(), engine: MITO_ENGINE.to_string(), - })), + }), }; assert!(catalog_manager.register_table(request).await.unwrap()); @@ -418,7 +418,7 @@ mod tests { schema: table_after.schema.clone(), table_name: table_after.table.clone(), table_id: table_after.table_id, - table: Arc::new(EmptyTable::new(CreateTableRequest { + table: EmptyTable::table(CreateTableRequest { id: table_after.table_id, catalog_name: table_after.catalog.clone(), schema_name: table_after.schema.clone(), @@ -430,7 +430,7 @@ mod tests { create_if_not_exists: false, table_options: Default::default(), engine: MITO_ENGINE.to_string(), - })), + }), }; assert!(catalog_manager.register_table(request).await.unwrap()); diff --git a/src/file-table-engine/src/engine/immutable.rs b/src/file-table-engine/src/engine/immutable.rs index a0729403bb..660ba969cd 100644 --- a/src/file-table-engine/src/engine/immutable.rs +++ b/src/file-table-engine/src/engine/immutable.rs @@ -30,7 +30,7 @@ use table::metadata::{TableId, TableInfo, TableInfoBuilder, TableMetaBuilder, Ta use table::requests::{ AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest, TruncateTableRequest, }; -use table::{error as table_error, Result as TableResult, Table, TableRef}; +use table::{error as table_error, Result as TableResult, TableRef}; use tokio::sync::Mutex; use crate::config::EngineConfig; @@ -302,7 +302,7 @@ impl EngineInner { let _ = self.tables.write().unwrap().insert(table_id, table.clone()); - Ok(table) + Ok(table.as_table_ref()) } fn get_table(&self, table_id: TableId) -> Option { @@ -311,7 +311,7 @@ impl EngineInner { .unwrap() .get(&table_id) .cloned() - .map(|table| table as _) + .map(|table| table.as_table_ref()) } async fn open_table( @@ -365,7 +365,7 @@ impl EngineInner { ); let _ = self.tables.write().unwrap().insert(table_id, table.clone()); - Some(table as _) + Some(table.as_table_ref()) }; logging::info!( diff --git a/src/file-table-engine/src/engine/tests.rs b/src/file-table-engine/src/engine/tests.rs index f2d5dcf543..5dc9b8b660 100644 --- a/src/file-table-engine/src/engine/tests.rs +++ b/src/file-table-engine/src/engine/tests.rs @@ -17,15 +17,14 @@ use std::sync::Arc; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, IMMUTABLE_FILE_ENGINE}; use table::engine::{EngineContext, TableEngine, TableEngineProcedure}; +use table::error as table_error; use table::requests::{ AlterKind, AlterTableRequest, DropTableRequest, OpenTableRequest, TruncateTableRequest, }; -use table::{error as table_error, Table}; use crate::config::EngineConfig; use crate::engine::immutable::ImmutableFileTableEngine; use crate::manifest::immutable::manifest_path; -use crate::table::immutable::ImmutableFileTable; use crate::test_util::{self, TestEngineComponents, TEST_TABLE_NAME}; #[tokio::test] @@ -78,11 +77,6 @@ async fn test_open_table() { .unwrap() .unwrap(); - let reopened = reopened - .as_any() - .downcast_ref::() - .unwrap(); - let left = table.table_info(); let right = reopened.table_info(); diff --git a/src/file-table-engine/src/table/immutable.rs b/src/file-table-engine/src/table/immutable.rs index 04ebddbfc0..99b52cf98d 100644 --- a/src/file-table-engine/src/table/immutable.rs +++ b/src/file-table-engine/src/table/immutable.rs @@ -12,10 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::any::Any; use std::sync::Arc; -use async_trait::async_trait; use common_datasource::file_format::Format; use common_datasource::object_store::build_backend; use common_error::ext::BoxedError; @@ -24,10 +22,12 @@ use datatypes::schema::SchemaRef; use object_store::ObjectStore; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; +use store_api::data_source::DataSource; use store_api::storage::{RegionNumber, ScanRequest}; use table::error::{self as table_error, Result as TableResult}; -use table::metadata::{RawTableInfo, TableInfo, TableInfoRef, TableType}; -use table::{requests, Table}; +use table::metadata::{FilterPushDownType, RawTableInfo, TableInfo}; +use table::thin_table::{ThinTable, ThinTableAdapter}; +use table::{requests, TableRef}; use super::format::create_stream; use crate::error::{self, ConvertRawSnafu, Result}; @@ -45,68 +45,12 @@ pub struct ImmutableFileTableOptions { pub struct ImmutableFileTable { metadata: ImmutableMetadata, - // currently, it's immutable - table_info: Arc, - object_store: ObjectStore, - files: Vec, - format: Format, + table_ref: TableRef, } pub type ImmutableFileTableRef = Arc; -#[async_trait] -impl Table for ImmutableFileTable { - fn as_any(&self) -> &dyn Any { - self - } - - /// The [`SchemaRef`] before the projection. - /// It contains all the columns that may appear in the files (All missing columns should be filled NULLs). - fn schema(&self) -> SchemaRef { - self.table_info().meta.schema.clone() - } - - fn table_info(&self) -> TableInfoRef { - self.table_info.clone() - } - - fn table_type(&self) -> TableType { - self.table_info().table_type - } - - async fn scan_to_stream(&self, request: ScanRequest) -> TableResult { - create_stream( - &self.format, - &CreateScanPlanContext::default(), - &ScanPlanConfig { - file_schema: self.schema(), - files: &self.files, - projection: request.projection.as_ref(), - filters: &request.filters, - limit: request.limit, - store: self.object_store.clone(), - }, - ) - .map_err(BoxedError::new) - .context(table_error::TableOperationSnafu) - } - - async fn flush( - &self, - _region_number: Option, - _wait: Option, - ) -> TableResult<()> { - // nothing to flush - Ok(()) - } -} - impl ImmutableFileTable { - #[inline] - pub fn metadata(&self) -> &ImmutableMetadata { - &self.metadata - } - pub(crate) fn new(table_info: TableInfo, metadata: ImmutableMetadata) -> Result { let table_info = Arc::new(table_info); let options = &table_info.meta.options.extra_options; @@ -129,12 +73,19 @@ impl ImmutableFileTable { let object_store = build_backend(url, options).context(error::BuildBackendSnafu)?; + let schema = table_info.meta.schema.clone(); + let thin_table = ThinTable::new(table_info, FilterPushDownType::Unsupported); + let data_source = Arc::new(ImmutableFileDataSource::new( + schema, + object_store, + meta.files, + format, + )); + let table_ref = Arc::new(ThinTableAdapter::new(thin_table, data_source)); + Ok(Self { metadata, - table_info, - object_store, - files: meta.files, - format, + table_ref, }) } @@ -171,4 +122,63 @@ impl ImmutableFileTable { Ok((metadata, table_info)) } + + #[inline] + pub fn metadata(&self) -> &ImmutableMetadata { + &self.metadata + } + + pub fn as_table_ref(&self) -> TableRef { + self.table_ref.clone() + } + + pub async fn close(&self, regions: &[RegionNumber]) -> TableResult<()> { + self.table_ref.close(regions).await + } +} + +struct ImmutableFileDataSource { + schema: SchemaRef, + object_store: ObjectStore, + files: Vec, + format: Format, +} + +impl ImmutableFileDataSource { + fn new( + schema: SchemaRef, + object_store: ObjectStore, + files: Vec, + format: Format, + ) -> Self { + Self { + schema, + object_store, + files, + format, + } + } +} + +impl DataSource for ImmutableFileDataSource { + fn get_stream( + &self, + request: ScanRequest, + ) -> std::result::Result { + create_stream( + &self.format, + &CreateScanPlanContext::default(), + &ScanPlanConfig { + file_schema: self.schema.clone(), + files: &self.files, + projection: request.projection.as_ref(), + filters: &request.filters, + limit: request.limit, + store: self.object_store.clone(), + }, + ) + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu) + .map_err(BoxedError::new) + } } diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index b8f92f26ea..946d0a28da 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -1429,7 +1429,7 @@ mod test { .meta(table_meta) .build() .unwrap(); - let table = Arc::new(EmptyTable::from_table_info(&table_info)); + let table = EmptyTable::from_table_info(&table_info); let catalog_list = MemoryCatalogManager::with_default_setup(); assert!(catalog_list .register_table(RegisterTableRequest { diff --git a/src/query/src/range_select/plan_rewrite.rs b/src/query/src/range_select/plan_rewrite.rs index b4d02daf76..aa6de6514c 100644 --- a/src/query/src/range_select/plan_rewrite.rs +++ b/src/query/src/range_select/plan_rewrite.rs @@ -377,7 +377,7 @@ mod test { .meta(table_meta) .build() .unwrap(); - let table = Arc::new(EmptyTable::from_table_info(&table_info)); + let table = EmptyTable::from_table_info(&table_info); let catalog_list = MemoryCatalogManager::with_default_setup(); assert!(catalog_list .register_table(RegisterTableRequest { diff --git a/src/query/src/sql.rs b/src/query/src/sql.rs index 9d8cd8946d..a978d74c6c 100644 --- a/src/query/src/sql.rs +++ b/src/query/src/sql.rs @@ -475,6 +475,6 @@ mod test { data: Vec, ) -> TableRef { let record_batch = RecordBatch::new(table_schema, data).unwrap(); - Arc::new(MemTable::new(table_name, record_batch)) + MemTable::table(table_name, record_batch) } } diff --git a/src/query/src/tests.rs b/src/query/src/tests.rs index cf44a5fe85..312bd4ea49 100644 --- a/src/query/src/tests.rs +++ b/src/query/src/tests.rs @@ -12,13 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - use catalog::local::MemoryCatalogManager; use common_query::Output; use common_recordbatch::{util, RecordBatch}; use session::context::QueryContext; -use table::test_util::MemTable; +use table::TableRef; use crate::parser::QueryLanguageParser; use crate::{QueryEngineFactory, QueryEngineRef}; @@ -50,8 +48,7 @@ async fn exec_selection(engine: QueryEngineRef, sql: &str) -> Vec { util::collect(stream).await.unwrap() } -pub fn new_query_engine_with_table(table: MemTable) -> QueryEngineRef { - let table = Arc::new(table); +pub fn new_query_engine_with_table(table: TableRef) -> QueryEngineRef { let catalog_manager = MemoryCatalogManager::new_with_table(table); QueryEngineFactory::new(catalog_manager, false).query_engine() diff --git a/src/query/src/tests/function.rs b/src/query/src/tests/function.rs index 99b2a490bf..39cd3e5068 100644 --- a/src/query/src/tests/function.rs +++ b/src/query/src/tests/function.rs @@ -48,7 +48,7 @@ pub fn create_query_engine() -> QueryEngineRef { let schema = Arc::new(Schema::new(column_schemas.clone())); let recordbatch = RecordBatch::new(schema, columns).unwrap(); - let number_table = MemTable::new("numbers", recordbatch); + let number_table = MemTable::table("numbers", recordbatch); new_query_engine_with_table(number_table) } diff --git a/src/query/src/tests/my_sum_udaf_example.rs b/src/query/src/tests/my_sum_udaf_example.rs index 6058bd0090..363816b4d6 100644 --- a/src/query/src/tests/my_sum_udaf_example.rs +++ b/src/query/src/tests/my_sum_udaf_example.rs @@ -202,7 +202,7 @@ where let schema = Arc::new(Schema::new(column_schemas.clone())); let column: VectorRef = Arc::new(T::VectorType::from_vec(numbers)); let recordbatch = RecordBatch::new(schema, vec![column]).unwrap(); - let testing_table = MemTable::new(&table_name, recordbatch); + let testing_table = MemTable::table(&table_name, recordbatch); let engine = new_query_engine_with_table(testing_table); diff --git a/src/query/src/tests/percentile_test.rs b/src/query/src/tests/percentile_test.rs index ce278aa986..1c01401b89 100644 --- a/src/query/src/tests/percentile_test.rs +++ b/src/query/src/tests/percentile_test.rs @@ -92,6 +92,6 @@ fn create_correctness_engine() -> Arc { columns.push(column); let schema = Arc::new(Schema::new(column_schemas)); - let number_table = MemTable::new("corr_numbers", RecordBatch::new(schema, columns).unwrap()); + let number_table = MemTable::table("corr_numbers", RecordBatch::new(schema, columns).unwrap()); new_query_engine_with_table(number_table) } diff --git a/src/query/src/tests/query_engine_test.rs b/src/query/src/tests/query_engine_test.rs index 880f9fb68a..39197fdb6a 100644 --- a/src/query/src/tests/query_engine_test.rs +++ b/src/query/src/tests/query_engine_test.rs @@ -60,7 +60,7 @@ async fn test_datafusion_query_engine() -> Result<()> { (0..100).collect::>(), ))]; let recordbatch = RecordBatch::new(schema, columns).unwrap(); - let table = Arc::new(MemTable::new("numbers", recordbatch)); + let table = MemTable::table("numbers", recordbatch); let limit = 10; let table_provider = Arc::new(DfTableProviderAdapter::new(table.clone())); diff --git a/src/query/src/tests/time_range_filter_test.rs b/src/query/src/tests/time_range_filter_test.rs index 157b356425..de412c1e44 100644 --- a/src/query/src/tests/time_range_filter_test.rs +++ b/src/query/src/tests/time_range_filter_test.rs @@ -12,72 +12,58 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::any::Any; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use catalog::local::new_memory_catalog_manager; use catalog::RegisterTableRequest; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_error::ext::BoxedError; use common_query::prelude::Expr; use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; use common_time::range::TimestampRange; use common_time::timestamp::TimeUnit; use common_time::Timestamp; use datatypes::data_type::ConcreteDataType; -use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; +use datatypes::schema::{ColumnSchema, Schema}; use datatypes::vectors::{Int64Vector, TimestampMillisecondVector}; +use store_api::data_source::{DataSource, DataSourceRef}; use store_api::storage::ScanRequest; -use table::metadata::{FilterPushDownType, TableInfoRef, TableType}; +use table::metadata::FilterPushDownType; use table::predicate::TimeRangePredicateBuilder; use table::test_util::MemTable; -use table::Table; -use tokio::sync::RwLock; +use table::thin_table::{ThinTable, ThinTableAdapter}; +use table::TableRef; use crate::tests::exec_selection; use crate::{QueryEngineFactory, QueryEngineRef}; -struct MemTableWrapper { - inner: MemTable, - filter: RwLock>, -} +struct MemTableWrapper; impl MemTableWrapper { - pub async fn get_filters(&self) -> Vec { - self.filter.write().await.drain(..).collect() + pub fn table(table: TableRef, filter: Arc>>) -> TableRef { + let table_info = table.table_info(); + let thin_table_adapter = table.as_any().downcast_ref::().unwrap(); + let data_source = thin_table_adapter.data_source(); + + let thin_table = ThinTable::new(table_info, FilterPushDownType::Exact); + let data_source = Arc::new(DataSourceWrapper { + inner: data_source, + filter, + }); + + Arc::new(ThinTableAdapter::new(thin_table, data_source)) } } -#[async_trait::async_trait] -impl Table for MemTableWrapper { - fn as_any(&self) -> &dyn Any { - self - } +struct DataSourceWrapper { + inner: DataSourceRef, + filter: Arc>>, +} - fn schema(&self) -> SchemaRef { - self.inner.schema() - } - - fn table_info(&self) -> TableInfoRef { - self.inner.table_info() - } - - fn table_type(&self) -> TableType { - self.inner.table_type() - } - - async fn scan_to_stream( - &self, - request: ScanRequest, - ) -> table::Result { - *self.filter.write().await = request.filters.clone(); - self.inner.scan_to_stream(request).await - } - - fn supports_filters_pushdown( - &self, - filters: &[&Expr], - ) -> table::Result> { - Ok(vec![FilterPushDownType::Exact; filters.len()]) +impl DataSource for DataSourceWrapper { + fn get_stream(&self, request: ScanRequest) -> Result { + *self.filter.write().unwrap() = request.filters.clone(); + self.inner.get_stream(request) } } @@ -92,8 +78,9 @@ fn create_test_engine() -> TimeRangeTester { ]) .unwrap(); - let table = Arc::new(MemTableWrapper { - inner: MemTable::new( + let filter = Arc::new(RwLock::new(vec![])); + let table = MemTableWrapper::table( + MemTable::table( "m", RecordBatch::new( Arc::new(schema), @@ -106,8 +93,8 @@ fn create_test_engine() -> TimeRangeTester { ) .unwrap(), ), - filter: Default::default(), - }); + filter.clone(), + ); let catalog_manager = new_memory_catalog_manager().unwrap(); let req = RegisterTableRequest { @@ -120,22 +107,26 @@ fn create_test_engine() -> TimeRangeTester { let _ = catalog_manager.register_table_sync(req).unwrap(); let engine = QueryEngineFactory::new(catalog_manager, false).query_engine(); - TimeRangeTester { engine, table } + TimeRangeTester { engine, filter } } struct TimeRangeTester { engine: QueryEngineRef, - table: Arc, + filter: Arc>>, } impl TimeRangeTester { async fn check(&self, sql: &str, expect: TimestampRange) { let _ = exec_selection(self.engine.clone(), sql).await; - let filters = self.table.get_filters().await; + let filters = self.get_filters(); let range = TimeRangePredicateBuilder::new("ts", TimeUnit::Millisecond, &filters).build(); assert_eq!(expect, range); } + + fn get_filters(&self) -> Vec { + self.filter.write().unwrap().drain(..).collect() + } } #[tokio::test] diff --git a/src/servers/src/prom_store.rs b/src/servers/src/prom_store.rs index 77cd6c8af4..813a39045e 100644 --- a/src/servers/src/prom_store.rs +++ b/src/servers/src/prom_store.rs @@ -582,7 +582,7 @@ mod tests { .unwrap(); let ctx = SessionContext::new(); - let table = Arc::new(MemTable::new("test", recordbatch)); + let table = MemTable::table("test", recordbatch); let table_provider = Arc::new(DfTableProviderAdapter::new(table)); let dataframe = ctx.read_table(table_provider.clone()).unwrap(); diff --git a/src/servers/tests/grpc/mod.rs b/src/servers/tests/grpc/mod.rs index 96becf5f94..e01498fa24 100644 --- a/src/servers/tests/grpc/mod.rs +++ b/src/servers/tests/grpc/mod.rs @@ -30,6 +30,7 @@ use servers::query_handler::grpc::ServerGrpcQueryHandlerRef; use servers::server::Server; use snafu::ResultExt; use table::test_util::MemTable; +use table::TableRef; use tokio::net::TcpListener; use tokio_stream::wrappers::TcpListenerStream; @@ -99,7 +100,7 @@ impl Server for MockGrpcServer { } } -fn create_grpc_server(table: MemTable) -> Result> { +fn create_grpc_server(table: TableRef) -> Result> { let query_handler = create_testing_grpc_query_handler(table); let io_runtime = Arc::new( RuntimeBuilder::default() diff --git a/src/servers/tests/mod.rs b/src/servers/tests/mod.rs index d10fc1b718..bbd38ac27c 100644 --- a/src/servers/tests/mod.rs +++ b/src/servers/tests/mod.rs @@ -34,7 +34,7 @@ use servers::query_handler::{ScriptHandler, ScriptHandlerRef}; use session::context::QueryContextRef; use snafu::ensure; use sql::statements::statement::Statement; -use table::test_util::MemTable; +use table::TableRef; mod grpc; mod http; @@ -202,21 +202,20 @@ impl GrpcQueryHandler for DummyInstance { } } -fn create_testing_instance(table: MemTable) -> DummyInstance { - let table = Arc::new(table); +fn create_testing_instance(table: TableRef) -> DummyInstance { let catalog_manager = MemoryCatalogManager::new_with_table(table); let query_engine = QueryEngineFactory::new(catalog_manager, false).query_engine(); DummyInstance::new(query_engine) } -fn create_testing_script_handler(table: MemTable) -> ScriptHandlerRef { +fn create_testing_script_handler(table: TableRef) -> ScriptHandlerRef { Arc::new(create_testing_instance(table)) as _ } -fn create_testing_sql_query_handler(table: MemTable) -> ServerSqlQueryHandlerRef { +fn create_testing_sql_query_handler(table: TableRef) -> ServerSqlQueryHandlerRef { Arc::new(create_testing_instance(table)) as _ } -fn create_testing_grpc_query_handler(table: MemTable) -> ServerGrpcQueryHandlerRef { +fn create_testing_grpc_query_handler(table: TableRef) -> ServerGrpcQueryHandlerRef { Arc::new(create_testing_instance(table)) as _ } diff --git a/src/servers/tests/mysql/mysql_server_test.rs b/src/servers/tests/mysql/mysql_server_test.rs index d29e09f7d9..9a884eea0b 100644 --- a/src/servers/tests/mysql/mysql_server_test.rs +++ b/src/servers/tests/mysql/mysql_server_test.rs @@ -32,6 +32,7 @@ use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef}; use servers::server::Server; use servers::tls::TlsOption; use table::test_util::MemTable; +use table::TableRef; use crate::create_testing_sql_query_handler; use crate::mysql::{all_datatype_testing_data, MysqlTextRow, TestingData}; @@ -43,7 +44,7 @@ struct MysqlOpts<'a> { reject_no_database: bool, } -fn create_mysql_server(table: MemTable, opts: MysqlOpts<'_>) -> Result> { +fn create_mysql_server(table: TableRef, opts: MysqlOpts<'_>) -> Result> { let query_handler = create_testing_sql_query_handler(table); let io_runtime = Arc::new( RuntimeBuilder::default() @@ -262,7 +263,7 @@ async fn test_server_required_secure_client_plain() -> Result<()> { } = all_datatype_testing_data(); let schema = Arc::new(Schema::new(column_schemas.clone())); let recordbatch = RecordBatch::new(schema, columns).unwrap(); - let table = MemTable::new("all_datatypes", recordbatch); + let table = MemTable::table("all_datatypes", recordbatch); let mysql_server = create_mysql_server( table, @@ -299,7 +300,7 @@ async fn test_server_required_secure_client_plain_with_pkcs8_priv_key() -> Resul } = all_datatype_testing_data(); let schema = Arc::new(Schema::new(column_schemas.clone())); let recordbatch = RecordBatch::new(schema, columns).unwrap(); - let table = MemTable::new("all_datatypes", recordbatch); + let table = MemTable::table("all_datatypes", recordbatch); let mysql_server = create_mysql_server( table, @@ -331,7 +332,7 @@ async fn test_db_name() -> Result<()> { } = all_datatype_testing_data(); let schema = Arc::new(Schema::new(column_schemas.clone())); let recordbatch = RecordBatch::new(schema, columns).unwrap(); - let table = MemTable::new("all_datatypes", recordbatch); + let table = MemTable::table("all_datatypes", recordbatch); let mysql_server = create_mysql_server( table, @@ -363,7 +364,7 @@ async fn do_test_query_all_datatypes(server_tls: TlsOption, client_tls: bool) -> } = all_datatype_testing_data(); let schema = Arc::new(Schema::new(column_schemas.clone())); let recordbatch = RecordBatch::new(schema, columns).unwrap(); - let table = MemTable::new("all_datatypes", recordbatch); + let table = MemTable::table("all_datatypes", recordbatch); let mysql_server = create_mysql_server( table, @@ -462,7 +463,7 @@ async fn test_query_prepared() -> Result<()> { } = all_datatype_testing_data(); let schema = Arc::new(Schema::new(column_schemas.clone())); let recordbatch = RecordBatch::new(schema, columns.clone()).unwrap(); - let table = MemTable::new("all_datatypes", recordbatch); + let table = MemTable::table("all_datatypes", recordbatch); let mysql_server = create_mysql_server( table, diff --git a/src/servers/tests/postgres/mod.rs b/src/servers/tests/postgres/mod.rs index c4f190e0ed..e80d7b7e38 100644 --- a/src/servers/tests/postgres/mod.rs +++ b/src/servers/tests/postgres/mod.rs @@ -30,12 +30,13 @@ use servers::postgres::PostgresServer; use servers::server::Server; use servers::tls::TlsOption; use table::test_util::MemTable; +use table::TableRef; use tokio_postgres::{Client, Error as PgError, NoTls, SimpleQueryMessage}; use crate::create_testing_instance; fn create_postgres_server( - table: MemTable, + table: TableRef, check_pwd: bool, tls: TlsOption, auth_info: Option, diff --git a/src/table/src/test_util/empty_table.rs b/src/table/src/test_util/empty_table.rs index f20817f791..8fde98e440 100644 --- a/src/table/src/test_util/empty_table.rs +++ b/src/table/src/test_util/empty_table.rs @@ -14,20 +14,23 @@ use std::sync::Arc; -use async_trait::async_trait; +use common_error::ext::BoxedError; use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream}; +use datatypes::schema::SchemaRef; +use store_api::data_source::DataSource; use store_api::storage::ScanRequest; -use crate::metadata::{TableInfo, TableInfoBuilder, TableInfoRef, TableMetaBuilder, TableType}; -use crate::requests::{CreateTableRequest, InsertRequest}; -use crate::{Result, Table}; +use crate::metadata::{ + FilterPushDownType, TableInfo, TableInfoBuilder, TableMetaBuilder, TableType, +}; +use crate::requests::CreateTableRequest; +use crate::thin_table::{ThinTable, ThinTableAdapter}; +use crate::TableRef; -pub struct EmptyTable { - info: TableInfoRef, -} +pub struct EmptyTable; impl EmptyTable { - pub fn new(req: CreateTableRequest) -> Self { + pub fn table(req: CreateTableRequest) -> TableRef { let schema = Arc::new(req.schema.try_into().unwrap()); let table_meta = TableMetaBuilder::default() .schema(schema) @@ -48,41 +51,27 @@ impl EmptyTable { .build() .unwrap(); - Self { - info: Arc::new(table_info), - } + Self::from_table_info(&table_info) } - pub fn from_table_info(info: &TableInfo) -> Self { - Self { - info: Arc::new(info.clone()), - } + pub fn from_table_info(info: &TableInfo) -> TableRef { + let thin_table = ThinTable::new(Arc::new(info.clone()), FilterPushDownType::Unsupported); + let data_source = Arc::new(EmptyDataSource { + schema: info.meta.schema.clone(), + }); + Arc::new(ThinTableAdapter::new(thin_table, data_source)) } } -#[async_trait] -impl Table for EmptyTable { - fn as_any(&self) -> &dyn std::any::Any { - self as _ - } +struct EmptyDataSource { + schema: SchemaRef, +} - fn schema(&self) -> datatypes::schema::SchemaRef { - self.info.meta.schema.clone() - } - - fn table_info(&self) -> TableInfoRef { - self.info.clone() - } - - fn table_type(&self) -> TableType { - self.info.table_type - } - - async fn insert(&self, _request: InsertRequest) -> Result { - Ok(0) - } - - async fn scan_to_stream(&self, _: ScanRequest) -> Result { - Ok(Box::pin(EmptyRecordBatchStream::new(self.schema()))) +impl DataSource for EmptyDataSource { + fn get_stream( + &self, + _request: ScanRequest, + ) -> std::result::Result { + Ok(Box::pin(EmptyRecordBatchStream::new(self.schema.clone()))) } } diff --git a/src/table/src/test_util/memtable.rs b/src/table/src/test_util/memtable.rs index e1aa84b5a8..35dbdf3d30 100644 --- a/src/table/src/test_util/memtable.rs +++ b/src/table/src/test_util/memtable.rs @@ -12,11 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::any::Any; use std::pin::Pin; use std::sync::Arc; -use async_trait::async_trait; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::ext::BoxedError; use common_recordbatch::error::Result as RecordBatchResult; @@ -27,22 +25,20 @@ use datatypes::vectors::UInt32Vector; use futures::task::{Context, Poll}; use futures::Stream; use snafu::prelude::*; +use store_api::data_source::DataSource; use store_api::storage::{RegionNumber, ScanRequest}; -use crate::error::{Result, SchemaConversionSnafu, TableProjectionSnafu, TablesRecordBatchSnafu}; +use crate::error::{SchemaConversionSnafu, TableProjectionSnafu, TablesRecordBatchSnafu}; use crate::metadata::{ - TableId, TableInfoBuilder, TableInfoRef, TableMetaBuilder, TableType, TableVersion, + FilterPushDownType, TableId, TableInfoBuilder, TableMetaBuilder, TableType, TableVersion, }; -use crate::{ColumnStatistics, Table, TableStatistics}; +use crate::thin_table::{ThinTable, ThinTableAdapter}; +use crate::TableRef; -#[derive(Debug, Clone)] -pub struct MemTable { - info: TableInfoRef, - recordbatch: RecordBatch, -} +pub struct MemTable; impl MemTable { - pub fn new(table_name: impl Into, recordbatch: RecordBatch) -> Self { + pub fn table(table_name: impl Into, recordbatch: RecordBatch) -> TableRef { Self::new_with_region(table_name, recordbatch, vec![0]) } @@ -50,7 +46,7 @@ impl MemTable { table_name: impl Into, recordbatch: RecordBatch, regions: Vec, - ) -> Self { + ) -> TableRef { Self::new_with_catalog( table_name, recordbatch, @@ -68,7 +64,7 @@ impl MemTable { catalog_name: String, schema_name: String, regions: Vec, - ) -> Self { + ) -> TableRef { let schema = recordbatch.schema.clone(); let meta = TableMetaBuilder::default() @@ -98,16 +94,14 @@ impl MemTable { .unwrap(), ); - Self { info, recordbatch } - } - - pub fn table_name(&self) -> &str { - &self.info.name + let thin_table = ThinTable::new(info, FilterPushDownType::Unsupported); + let data_source = Arc::new(MemtableDataSource { recordbatch }); + Arc::new(ThinTableAdapter::new(thin_table, data_source)) } /// Creates a 1 column 100 rows table, with table name "numbers", column name "uint32s" and /// column type "uint32". Column data increased from 0 to 100. - pub fn default_numbers_table() -> Self { + pub fn default_numbers_table() -> TableRef { let column_schemas = vec![ColumnSchema::new( "uint32s", ConcreteDataType::uint32_datatype(), @@ -118,34 +112,25 @@ impl MemTable { (0..100).collect::>(), ))]; let recordbatch = RecordBatch::new(schema, columns).unwrap(); - MemTable::new("numbers", recordbatch) + MemTable::table("numbers", recordbatch) } } -#[async_trait] -impl Table for MemTable { - fn as_any(&self) -> &dyn Any { - self - } +struct MemtableDataSource { + recordbatch: RecordBatch, +} - fn schema(&self) -> SchemaRef { - self.recordbatch.schema.clone() - } - - fn table_info(&self) -> TableInfoRef { - self.info.clone() - } - - fn table_type(&self) -> TableType { - self.info.table_type - } - - async fn scan_to_stream(&self, request: ScanRequest) -> Result { +impl DataSource for MemtableDataSource { + fn get_stream( + &self, + request: ScanRequest, + ) -> std::result::Result { let df_recordbatch = if let Some(indices) = request.projection { self.recordbatch .df_record_batch() .project(&indices) - .context(TableProjectionSnafu)? + .context(TableProjectionSnafu) + .map_err(BoxedError::new)? } else { self.recordbatch.df_record_batch().clone() }; @@ -159,41 +144,22 @@ impl Table for MemTable { let df_recordbatch = df_recordbatch.slice(0, limit); let recordbatch = RecordBatch::try_from_df_record_batch( - Arc::new(Schema::try_from(df_recordbatch.schema()).context(SchemaConversionSnafu)?), + Arc::new( + Schema::try_from(df_recordbatch.schema()) + .context(SchemaConversionSnafu) + .map_err(BoxedError::new)?, + ), df_recordbatch, ) .map_err(BoxedError::new) - .context(TablesRecordBatchSnafu)?; + .context(TablesRecordBatchSnafu) + .map_err(BoxedError::new)?; Ok(Box::pin(MemtableStream { schema: recordbatch.schema.clone(), recordbatch: Some(recordbatch), })) } - - fn statistics(&self) -> Option { - let df_recordbatch = self.recordbatch.df_record_batch(); - let num_rows = df_recordbatch.num_rows(); - let total_byte_size = df_recordbatch.get_array_memory_size(); - let column_statistics: Vec<_> = df_recordbatch - .columns() - .iter() - .map(|col| { - let null_count = col.null_count(); - ColumnStatistics { - null_count: Some(null_count), - // TODO(discord9): implement more statistics - ..Default::default() - } - }) - .collect(); - Some(TableStatistics { - num_rows: Some(num_rows), - total_byte_size: Some(total_byte_size), - column_statistics: Some(column_statistics), - is_exact: true, - }) - } } impl RecordBatchStream for MemtableStream { @@ -278,7 +244,7 @@ mod test { assert_eq!(vec!["hello"], string_column); } - fn build_testing_table() -> MemTable { + fn build_testing_table() -> TableRef { let i32_column_schema = ColumnSchema::new("i32_numbers", ConcreteDataType::int32_datatype(), true); let string_column_schema = @@ -301,6 +267,6 @@ mod test { ])), ]; let recordbatch = RecordBatch::new(schema, columns).unwrap(); - MemTable::new("", recordbatch) + MemTable::table("", recordbatch) } } diff --git a/src/table/src/test_util/mock_engine.rs b/src/table/src/test_util/mock_engine.rs index f6eba2adb8..8d88d93666 100644 --- a/src/table/src/test_util/mock_engine.rs +++ b/src/table/src/test_util/mock_engine.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::collections::HashMap; -use std::sync::Arc; use async_trait::async_trait; use common_procedure::BoxedProcedure; @@ -53,7 +52,7 @@ impl TableEngine for MockTableEngine { let schema_name = request.schema_name.clone(); let table_name = request.table_name.clone(); - let table_ref = Arc::new(EmptyTable::new(request)); + let table_ref = EmptyTable::table(request); let _ = self .tables diff --git a/src/table/src/thin_table.rs b/src/table/src/thin_table.rs index b10dd84acb..df8c678a7e 100644 --- a/src/table/src/thin_table.rs +++ b/src/table/src/thin_table.rs @@ -51,6 +51,10 @@ impl ThinTableAdapter { pub fn new(table: ThinTable, data_source: DataSourceRef) -> Self { Self { table, data_source } } + + pub fn data_source(&self) -> DataSourceRef { + self.data_source.clone() + } } #[async_trait]