diff --git a/Cargo.lock b/Cargo.lock index aaca510c99..7873434f40 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3712,7 +3712,6 @@ dependencies = [ "stats-cli", "streaming-stats", "table", - "test-util", "tokio", "tokio-stream", ] @@ -4574,7 +4573,7 @@ dependencies = [ "serde", "serde_json", "snafu", - "test-util", + "table", "tokio", "tokio-postgres", "tokio-stream", @@ -5119,22 +5118,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "test-util" -version = "0.1.0" -dependencies = [ - "arrow2", - "async-trait", - "common-query", - "common-recordbatch", - "datafusion", - "datatypes", - "futures", - "snafu", - "table", - "tokio", -] - [[package]] name = "textwrap" version = "0.11.0" diff --git a/Cargo.toml b/Cargo.toml index 4d3dfea88f..8b4fbec25f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,5 +30,4 @@ members = [ "src/store-api", "src/table", "src/table-engine", - "test-util", ] diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index 6525783485..cf2b5b01f3 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -12,7 +12,9 @@ common-query = { path = "../common/query" } common-recordbatch = { path = "../common/recordbatch" } common-telemetry = { path = "../common/telemetry" } common-time = { path = "../common/time" } -datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] } +datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = [ + "simd", +] } datatypes = { path = "../datatypes" } futures = "0.3" futures-util = "0.3" diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index 041e60f03a..dd5e9d9f27 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -16,7 +16,7 @@ pub mod error; mod manager; pub mod memory; pub mod schema; -mod system; +pub mod system; pub mod tables; /// Represent a list of named catalogs diff --git a/src/catalog/src/system.rs b/src/catalog/src/system.rs index a7bc2aa262..e306b7a8db 100644 --- a/src/catalog/src/system.rs +++ b/src/catalog/src/system.rs @@ -328,18 +328,12 @@ pub struct TableEntryValue { #[cfg(test)] mod tests { - use datafusion::execution::runtime_env::RuntimeEnv; - use datafusion::field_util::SchemaExt; - use datatypes::arrow; use log_store::fs::noop::NoopLogStore; use object_store::ObjectStore; use storage::config::EngineConfig as StorageEngineConfig; use storage::EngineImpl; - use table::engine::TableEngine; use table::metadata::TableType; use table::metadata::TableType::Base; - use table::requests::{AlterTableRequest, DropTableRequest}; - use table::table::adapter::TableAdapter; use table_engine::config::EngineConfig; use table_engine::engine::MitoEngine; use tempdir::TempDir; @@ -416,84 +410,7 @@ mod tests { assert!(EntryType::try_from(4).is_err()); } - struct MockTableEngine { - table_name: String, - sole_table: TableRef, - } - - impl Default for MockTableEngine { - fn default() -> Self { - Self { - table_name: SYSTEM_CATALOG_TABLE_NAME.to_string(), - sole_table: Arc::new( - TableAdapter::new( - Arc::new(datafusion::datasource::empty::EmptyTable::new(Arc::new( - arrow::datatypes::Schema::empty(), - ))), - Arc::new(RuntimeEnv::default()), - ) - .unwrap(), - ), - } - } - } - - #[async_trait::async_trait] - impl TableEngine for MockTableEngine { - fn name(&self) -> &str { - "MockTableEngine" - } - - async fn create_table( - &self, - _ctx: &EngineContext, - _request: CreateTableRequest, - ) -> table::Result { - unreachable!() - } - - async fn open_table( - &self, - _ctx: &EngineContext, - request: OpenTableRequest, - ) -> table::Result> { - if request.table_name == self.table_name { - Ok(Some(self.sole_table.clone())) - } else { - Ok(None) - } - } - - async fn alter_table( - &self, - _ctx: &EngineContext, - _request: AlterTableRequest, - ) -> table::Result { - unreachable!() - } - - fn get_table(&self, _ctx: &EngineContext, name: &str) -> table::Result> { - if name == self.table_name { - Ok(Some(self.sole_table.clone())) - } else { - Ok(None) - } - } - - fn table_exists(&self, _ctx: &EngineContext, name: &str) -> bool { - name == self.table_name - } - - async fn drop_table( - &self, - _ctx: &EngineContext, - _request: DropTableRequest, - ) -> table::Result<()> { - unreachable!() - } - } - - async fn prepare_table_engine() -> (TempDir, TableEngineRef) { + pub async fn prepare_table_engine() -> (TempDir, TableEngineRef) { let dir = TempDir::new("system-table-test").unwrap(); let store_dir = dir.path().to_string_lossy(); let accessor = opendal::services::fs::Builder::default() diff --git a/src/query/Cargo.toml b/src/query/Cargo.toml index d0d8ea983e..831b43e908 100644 --- a/src/query/Cargo.toml +++ b/src/query/Cargo.toml @@ -13,7 +13,9 @@ common-query = { path = "../common/query" } common-recordbatch = { path = "../common/recordbatch" } common-telemetry = { path = "../common/telemetry" } common-time = { path = "../common/time" } -datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] } +datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = [ + "simd", +] } datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" } datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" } datatypes = { path = "../datatypes" } @@ -30,7 +32,16 @@ tokio = "1.0" [dependencies.arrow] package = "arrow2" version = "0.10" -features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"] +features = [ + "io_csv", + "io_json", + "io_parquet", + "io_parquet_compression", + "io_ipc", + "ahash", + "compute", + "serde_types", +] [dev-dependencies] approx_eq = "0.1" @@ -43,7 +54,5 @@ rand = "0.8" statrs = "0.15" stats-cli = "3.0" streaming-stats = "0.2" -test-util = { path = "../../test-util" } tokio = { version = "1.0", features = ["full"] } tokio-stream = "0.1" - diff --git a/src/query/tests/function.rs b/src/query/tests/function.rs index 37428dc76a..3d51c7cd51 100644 --- a/src/query/tests/function.rs +++ b/src/query/tests/function.rs @@ -14,7 +14,7 @@ use datatypes::vectors::PrimitiveVector; use query::query_engine::QueryEngineFactory; use query::QueryEngine; use rand::Rng; -use test_util::MemTable; +use table::test_util::MemTable; pub fn create_query_engine() -> Arc { let schema_provider = Arc::new(MemorySchemaProvider::new()); diff --git a/src/query/tests/my_sum_udaf_example.rs b/src/query/tests/my_sum_udaf_example.rs index 805f8784ce..a2fd37ded6 100644 --- a/src/query/tests/my_sum_udaf_example.rs +++ b/src/query/tests/my_sum_udaf_example.rs @@ -26,7 +26,7 @@ use datatypes::with_match_primitive_type_id; use num_traits::AsPrimitive; use query::error::Result; use query::QueryEngineFactory; -use test_util::MemTable; +use table::test_util::MemTable; #[derive(Debug, Default)] struct MySumAccumulator diff --git a/src/query/tests/percentile_test.rs b/src/query/tests/percentile_test.rs index 08bf7df854..37511fde6f 100644 --- a/src/query/tests/percentile_test.rs +++ b/src/query/tests/percentile_test.rs @@ -18,7 +18,7 @@ use function::{create_query_engine, get_numbers_from_table}; use num_traits::AsPrimitive; use query::error::Result; use query::{QueryEngine, QueryEngineFactory}; -use test_util::MemTable; +use table::test_util::MemTable; #[tokio::test] async fn test_percentile_aggregator() -> Result<()> { diff --git a/src/query/tests/query_engine_test.rs b/src/query/tests/query_engine_test.rs index a1c9d79882..48e5045900 100644 --- a/src/query/tests/query_engine_test.rs +++ b/src/query/tests/query_engine_test.rs @@ -27,7 +27,7 @@ use query::QueryEngine; use rand::Rng; use table::table::adapter::DfTableProviderAdapter; use table::table::numbers::NumbersTable; -use test_util::MemTable; +use table::test_util::MemTable; use crate::pow::pow; diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 4d2df5d0bb..8acc19bd2d 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -43,6 +43,6 @@ mysql_async = { git = "https://github.com/Morranto/mysql_async.git", rev = "127b rand = "0.8" script = { path = "../script", features = ["python"] } query = { path = "../query" } -test-util = { path = "../../test-util" } +table = { path = "../table" } tokio-postgres = "0.7" tokio-test = "0.4" diff --git a/src/servers/tests/http/http_handler_test.rs b/src/servers/tests/http/http_handler_test.rs index 7177c9a71d..c2eb37b29b 100644 --- a/src/servers/tests/http/http_handler_test.rs +++ b/src/servers/tests/http/http_handler_test.rs @@ -6,7 +6,7 @@ use metrics::counter; use servers::http::handler as http_handler; use servers::http::handler::ScriptExecution; use servers::http::{HttpResponse, JsonOutput}; -use test_util::MemTable; +use table::test_util::MemTable; use crate::create_testing_sql_query_handler; diff --git a/src/servers/tests/mod.rs b/src/servers/tests/mod.rs index e907f032b4..28726853c0 100644 --- a/src/servers/tests/mod.rs +++ b/src/servers/tests/mod.rs @@ -10,7 +10,7 @@ use common_query::Output; use query::{QueryEngineFactory, QueryEngineRef}; use servers::error::Result; use servers::query_handler::{SqlQueryHandler, SqlQueryHandlerRef}; -use test_util::MemTable; +use table::test_util::MemTable; mod http; mod mysql; diff --git a/src/servers/tests/mysql/mysql_server_test.rs b/src/servers/tests/mysql/mysql_server_test.rs index ba3f99992d..91e5f29367 100644 --- a/src/servers/tests/mysql/mysql_server_test.rs +++ b/src/servers/tests/mysql/mysql_server_test.rs @@ -11,7 +11,7 @@ use rand::Rng; use servers::error::Result; use servers::mysql::server::MysqlServer; use servers::server::Server; -use test_util::MemTable; +use table::test_util::MemTable; use crate::create_testing_sql_query_handler; use crate::mysql::{all_datatype_testing_data, MysqlTextRow, TestingData}; diff --git a/src/servers/tests/postgres/mod.rs b/src/servers/tests/postgres/mod.rs index 2970a92959..714e5e097b 100644 --- a/src/servers/tests/postgres/mod.rs +++ b/src/servers/tests/postgres/mod.rs @@ -8,7 +8,7 @@ use rand::Rng; use servers::error::Result; use servers::postgres::PostgresServer; use servers::server::Server; -use test_util::MemTable; +use table::test_util::MemTable; use tokio_postgres::{Client, Error as PgError, NoTls, SimpleQueryMessage}; use crate::create_testing_sql_query_handler; diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index 21f1023d3b..41ef7c815f 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -10,7 +10,9 @@ common-error = { path = "../common/error" } common-query = { path = "../common/query" } common-recordbatch = { path = "../common/recordbatch" } common-telemetry = { path = "../common/telemetry" } -datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] } +datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = [ + "simd", +] } datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" } datatypes = { path = "../datatypes" } derive_builder = "0.11" @@ -20,9 +22,9 @@ paste = "1.0" serde = "1.0.136" snafu = { version = "0.7", features = ["backtraces"] } store-api = { path = "../store-api" } +tokio = { version = "1.18", features = ["full"] } [dev-dependencies] datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" } tempdir = "0.3" -tokio = { version = "1.18", features = ["full"] } tokio-util = { version = "0.7", features = ["compat"] } diff --git a/src/table/src/lib.rs b/src/table/src/lib.rs index f02821500d..808d4714c4 100644 --- a/src/table/src/lib.rs +++ b/src/table/src/lib.rs @@ -4,6 +4,7 @@ pub mod metadata; pub mod predicate; pub mod requests; pub mod table; +pub mod test_util; pub use crate::error::{Error, Result}; pub use crate::table::{Table, TableRef}; diff --git a/src/table/src/test_util.rs b/src/table/src/test_util.rs new file mode 100644 index 0000000000..49cb19c2e1 --- /dev/null +++ b/src/table/src/test_util.rs @@ -0,0 +1,7 @@ +mod empty_table; +mod memtable; +mod mock_engine; + +pub use empty_table::EmptyTable; +pub use memtable::MemTable; +pub use mock_engine::MockTableEngine; diff --git a/src/table/src/test_util/empty_table.rs b/src/table/src/test_util/empty_table.rs new file mode 100644 index 0000000000..ebe2e418c7 --- /dev/null +++ b/src/table/src/test_util/empty_table.rs @@ -0,0 +1,70 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream}; + +use crate::metadata::TableInfoBuilder; +use crate::metadata::TableInfoRef; +use crate::requests::InsertRequest; +use crate::Result; +use crate::{ + metadata::{TableMetaBuilder, TableType}, + requests::CreateTableRequest, + Table, +}; + +pub struct EmptyTable { + info: TableInfoRef, +} + +impl EmptyTable { + pub fn new(req: CreateTableRequest) -> Self { + let table_meta = TableMetaBuilder::default() + .schema(req.schema) + .primary_key_indices(req.primary_key_indices) + .next_column_id(0) + .options(req.table_options) + .build(); + let table_info = TableInfoBuilder::default() + .catalog_name(req.catalog_name) + .schema_name(req.schema_name) + .name(req.table_name) + .meta(table_meta.unwrap()) + .table_type(TableType::Temporary) + .desc(req.desc) + .build() + .unwrap(); + + Self { + info: Arc::new(table_info), + } + } +} + +#[async_trait] +impl Table for EmptyTable { + fn as_any(&self) -> &dyn std::any::Any { + self as _ + } + + fn schema(&self) -> datatypes::schema::SchemaRef { + self.info.meta.schema.clone() + } + + fn table_info(&self) -> TableInfoRef { + self.info.clone() + } + + async fn insert(&self, _request: InsertRequest) -> Result { + Ok(0) + } + + async fn scan( + &self, + _projection: &Option>, + _filters: &[common_query::prelude::Expr], + _limit: Option, + ) -> Result { + Ok(Box::pin(EmptyRecordBatchStream::new(self.schema()))) + } +} diff --git a/test-util/src/memtable.rs b/src/table/src/test_util/memtable.rs similarity index 98% rename from test-util/src/memtable.rs rename to src/table/src/test_util/memtable.rs index 1869233395..dd923fc3fc 100644 --- a/test-util/src/memtable.rs +++ b/src/table/src/test_util/memtable.rs @@ -12,9 +12,10 @@ use datatypes::vectors::UInt32Vector; use futures::task::{Context, Poll}; use futures::Stream; use snafu::prelude::*; -use table::error::{Result, SchemaConversionSnafu, TableProjectionSnafu}; -use table::metadata::TableInfoRef; -use table::Table; + +use crate::error::{Result, SchemaConversionSnafu, TableProjectionSnafu}; +use crate::metadata::TableInfoRef; +use crate::Table; #[derive(Debug, Clone)] pub struct MemTable { diff --git a/src/table/src/test_util/mock_engine.rs b/src/table/src/test_util/mock_engine.rs new file mode 100644 index 0000000000..ff05380a63 --- /dev/null +++ b/src/table/src/test_util/mock_engine.rs @@ -0,0 +1,87 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use async_trait::async_trait; +use tokio::sync::Mutex; + +use crate::test_util::EmptyTable; +use crate::{ + engine::{EngineContext, TableEngine}, + requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest}, + Result, TableRef, +}; + +#[derive(Default)] +pub struct MockTableEngine { + tables: Mutex>, +} + +impl MockTableEngine { + pub fn new() -> Self { + Self::default() + } +} + +#[async_trait] +impl TableEngine for MockTableEngine { + fn name(&self) -> &str { + "MockTableEngine" + } + + async fn create_table( + &self, + _ctx: &EngineContext, + request: CreateTableRequest, + ) -> Result { + let catalog_name = request.catalog_name.clone(); + let schema_name = request.schema_name.clone(); + let table_name = request.table_name.clone(); + + let table_ref = Arc::new(EmptyTable::new(request)); + + self.tables + .lock() + .await + .insert((catalog_name, schema_name, table_name), table_ref.clone()); + Ok(table_ref) + } + + async fn open_table( + &self, + _ctx: &EngineContext, + request: OpenTableRequest, + ) -> Result> { + let catalog_name = request.catalog_name; + let schema_name = request.schema_name; + let table_name = request.table_name; + + let res = self + .tables + .lock() + .await + .get(&(catalog_name, schema_name, table_name)) + .cloned(); + + Ok(res) + } + + async fn alter_table( + &self, + _ctx: &EngineContext, + _request: AlterTableRequest, + ) -> Result { + unimplemented!() + } + + fn get_table(&self, _ctx: &EngineContext, _name: &str) -> Result> { + unimplemented!() + } + + fn table_exists(&self, _ctx: &EngineContext, _name: &str) -> bool { + unimplemented!() + } + + async fn drop_table(&self, _ctx: &EngineContext, _request: DropTableRequest) -> Result<()> { + unimplemented!() + } +} diff --git a/test-util/Cargo.toml b/test-util/Cargo.toml deleted file mode 100644 index 740bd043e1..0000000000 --- a/test-util/Cargo.toml +++ /dev/null @@ -1,20 +0,0 @@ -[package] -name = "test-util" -version = "0.1.0" -edition = "2021" - -[dependencies] -async-trait = "0.1" -common-query = { path = "../src/common/query" } -common-recordbatch = { path = "../src/common/recordbatch" } -datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] } -datatypes = { path = "../src/datatypes" } -futures = "0.3" -snafu = { version = "0.7", features = ["backtraces"] } -table = { path = "../src/table" } -tokio = { version = "1.20", features = ["full"] } - -[dependencies.arrow] -package = "arrow2" -version = "0.10" -features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"] diff --git a/test-util/src/lib.rs b/test-util/src/lib.rs deleted file mode 100644 index 135489fa90..0000000000 --- a/test-util/src/lib.rs +++ /dev/null @@ -1,3 +0,0 @@ -mod memtable; - -pub use memtable::MemTable;