From 7395920bc87f5dc02e80ddb8ba5ca54730226e03 Mon Sep 17 00:00:00 2001 From: "Lei, Huang" <6406592+v0y4g3r@users.noreply.github.com> Date: Thu, 4 Aug 2022 11:05:28 +0800 Subject: [PATCH] move catalog-related traits and struct to a catalog crate (#134) --- Cargo.lock | 14 ++++++++++++++ Cargo.toml | 1 + src/catalog/Cargo.toml | 16 ++++++++++++++++ src/{query => catalog}/src/catalog.rs | 4 +--- src/catalog/src/error.rs | 11 +++++++++++ src/catalog/src/lib.rs | 8 ++++++++ src/{query/src/catalog => catalog/src}/memory.rs | 3 +-- src/{query/src/catalog => catalog/src}/schema.rs | 0 src/datanode/Cargo.toml | 1 + src/datanode/src/catalog.rs | 1 - src/datanode/src/datanode.rs | 5 ++--- src/datanode/src/error.rs | 16 +++++++++++++--- src/datanode/src/instance.rs | 7 +++---- src/datanode/src/lib.rs | 1 - src/datanode/src/server/http/handler.rs | 3 +-- src/datanode/src/sql.rs | 16 +++++++++------- src/datanode/src/sql/insert.rs | 2 +- src/datanode/src/tests/http_test.rs | 3 +-- src/query/Cargo.toml | 13 +++++++------ src/query/src/datafusion.rs | 5 ++--- src/query/src/datafusion/catalog_adapter.rs | 13 ++++++++----- src/query/src/datafusion/error.rs | 6 ++++++ src/query/src/error.rs | 6 ++++++ src/query/src/lib.rs | 1 - src/query/src/query_engine.rs | 5 ++--- src/query/src/query_engine/state.rs | 2 +- src/query/tests/my_sum_udaf_example.rs | 5 ++--- src/query/tests/query_engine_test.rs | 9 ++++----- 28 files changed, 121 insertions(+), 56 deletions(-) create mode 100644 src/catalog/Cargo.toml rename src/{query => catalog}/src/catalog.rs (95%) create mode 100644 src/catalog/src/error.rs create mode 100644 src/catalog/src/lib.rs rename src/{query/src/catalog => catalog/src}/memory.rs (98%) rename src/{query/src/catalog => catalog/src}/schema.rs (100%) delete mode 100644 src/datanode/src/catalog.rs diff --git a/Cargo.lock b/Cargo.lock index 502f25438b..10fabc50eb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -500,6 +500,18 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2698f953def977c68f935bb0dfa959375ad4638570e969e2f1e9f433cbf1af6" +[[package]] +name = "catalog" +version = "0.1.0" +dependencies = [ + "common-error", + "common-telemetry", + "datafusion", + "snafu", + "table", + "tokio", +] + [[package]] name = "cc" version = "1.0.73" @@ -1155,6 +1167,7 @@ dependencies = [ "axum", "axum-macros", "axum-test-helper", + "catalog", "common-error", "common-query", "common-recordbatch", @@ -2962,6 +2975,7 @@ dependencies = [ "arc-swap", "arrow2", "async-trait", + "catalog", "common-error", "common-function", "common-query", diff --git a/Cargo.toml b/Cargo.toml index 982d49ddd4..00274c419d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ [workspace] members = [ "src/api", + "src/catalog", "src/client", "src/common/base", "src/common/error", diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml new file mode 100644 index 0000000000..649786a08e --- /dev/null +++ b/src/catalog/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "catalog" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +common-error = { path = "../common/error" } +common-telemetry = { path = "../common/telemetry" } +datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] } +snafu = { version = "0.7", features = ["backtraces"] } +table = { path = "../table" } + +[dev-dependencies] +tokio = { version = "1.0", features = ["full"] } diff --git a/src/query/src/catalog.rs b/src/catalog/src/catalog.rs similarity index 95% rename from src/query/src/catalog.rs rename to src/catalog/src/catalog.rs index 5c0a845ab6..c2040eedc1 100644 --- a/src/query/src/catalog.rs +++ b/src/catalog/src/catalog.rs @@ -1,9 +1,7 @@ -pub mod memory; -pub mod schema; use std::any::Any; use std::sync::Arc; -use crate::catalog::schema::SchemaProvider; +use crate::schema::SchemaProvider; /// Represent a list of named catalogs pub trait CatalogList: Sync + Send { diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs new file mode 100644 index 0000000000..1253da97d2 --- /dev/null +++ b/src/catalog/src/error.rs @@ -0,0 +1,11 @@ +use datafusion::error::DataFusionError; + +common_error::define_opaque_error!(Error); + +pub type Result = std::result::Result; + +impl From for DataFusionError { + fn from(e: Error) -> DataFusionError { + DataFusionError::External(Box::new(e)) + } +} diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs new file mode 100644 index 0000000000..b15a6a4c40 --- /dev/null +++ b/src/catalog/src/lib.rs @@ -0,0 +1,8 @@ +mod catalog; +pub mod error; +pub mod memory; +mod schema; + +pub use crate::catalog::{CatalogList, CatalogListRef, CatalogProvider, CatalogProviderRef}; +pub use crate::catalog::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +pub use crate::schema::{SchemaProvider, SchemaProviderRef}; diff --git a/src/query/src/catalog/memory.rs b/src/catalog/src/memory.rs similarity index 98% rename from src/query/src/catalog/memory.rs rename to src/catalog/src/memory.rs index 5d1eb5aa4a..fea19ca920 100644 --- a/src/query/src/catalog/memory.rs +++ b/src/catalog/src/memory.rs @@ -7,12 +7,12 @@ use common_error::prelude::*; use table::table::numbers::NumbersTable; use table::TableRef; -use crate::catalog::schema::SchemaProvider; use crate::catalog::{ CatalogList, CatalogListRef, CatalogProvider, CatalogProviderRef, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, }; use crate::error::{Error, Result}; +use crate::schema::SchemaProvider; /// Error implementation of memory catalog. #[derive(Debug, Snafu)] @@ -191,7 +191,6 @@ pub fn new_memory_catalog_list() -> Result { #[cfg(test)] mod tests { - use table::table::numbers::NumbersTable; use super::*; diff --git a/src/query/src/catalog/schema.rs b/src/catalog/src/schema.rs similarity index 100% rename from src/query/src/catalog/schema.rs rename to src/catalog/src/schema.rs diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index 7d0023ac44..5d8915d283 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -10,6 +10,7 @@ api = { path = "../api" } async-trait = "0.1" axum = "0.5" axum-macros = "0.2" +catalog = { path = "../catalog" } common-error = { path = "../common/error" } common-recordbatch = { path = "../common/recordbatch" } common-telemetry = { path = "../common/telemetry" } diff --git a/src/datanode/src/catalog.rs b/src/datanode/src/catalog.rs deleted file mode 100644 index 8b13789179..0000000000 --- a/src/datanode/src/catalog.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index bc1c085ed7..1b790ecbf9 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -1,7 +1,6 @@ use std::sync::Arc; -use query::catalog::memory; -use query::catalog::CatalogListRef; +use catalog::CatalogListRef; use snafu::ResultExt; use crate::error::{NewCatalogSnafu, Result}; @@ -35,7 +34,7 @@ pub struct Datanode { impl Datanode { pub async fn new(opts: DatanodeOptions) -> Result { - let catalog_list = memory::new_memory_catalog_list().context(NewCatalogSnafu)?; + let catalog_list = catalog::memory::new_memory_catalog_list().context(NewCatalogSnafu)?; let instance = Arc::new(Instance::new(&opts, catalog_list.clone()).await?); Ok(Self { diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index ed15c0eba6..24c6f76280 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -20,7 +20,7 @@ pub enum Error { #[snafu(display("Fail to create catalog list, source: {}", source))] NewCatalog { #[snafu(backtrace)] - source: query::error::Error, + source: catalog::error::Error, }, #[snafu(display("Fail to create table: {}, {}", table_name, source))] @@ -115,7 +115,8 @@ pub type Result = std::result::Result; impl ErrorExt for Error { fn status_code(&self) -> StatusCode { match self { - Error::ExecuteSql { source } | Error::NewCatalog { source } => source.status_code(), + Error::ExecuteSql { source } => source.status_code(), + Error::NewCatalog { source } => source.status_code(), Error::CreateTable { source, .. } => source.status_code(), Error::GetTable { source, .. } => source.status_code(), Error::Insert { source, .. } => source.status_code(), @@ -164,6 +165,12 @@ mod tests { ))) } + fn throw_catalog_error() -> std::result::Result<(), catalog::error::Error> { + Err(catalog::error::Error::new(MockError::with_backtrace( + StatusCode::Internal, + ))) + } + fn assert_internal_error(err: &Error) { assert!(err.backtrace_opt().is_some()); assert_eq!(StatusCode::Internal, err.status_code()); @@ -179,7 +186,10 @@ mod tests { let err = throw_query_error().context(ExecuteSqlSnafu).err().unwrap(); assert_internal_error(&err); assert_tonic_internal_error(err); - let err = throw_query_error().context(NewCatalogSnafu).err().unwrap(); + let err = throw_catalog_error() + .context(NewCatalogSnafu) + .err() + .unwrap(); assert_internal_error(&err); assert_tonic_internal_error(err); } diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 8dea3c2d3a..f9d1afe27e 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -1,11 +1,11 @@ use std::{fs, path, sync::Arc}; use api::v1::InsertExpr; +use catalog::{CatalogListRef, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_telemetry::logging::info; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, Schema}; use log_store::fs::{config::LogConfig, log::LocalFileLogStore}; -use query::catalog::{CatalogListRef, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use query::query_engine::{Output, QueryEngineFactory, QueryEngineRef}; use snafu::{OptionExt, ResultExt}; use sql::statements::statement::Statement; @@ -177,14 +177,13 @@ async fn create_local_file_log_store(opts: &DatanodeOptions) -> Result Extension { - let catalog_list = memory::new_memory_catalog_list().unwrap(); + let catalog_list = catalog::memory::new_memory_catalog_list().unwrap(); let (opts, _tmp_dir) = test_util::create_tmp_dir_and_datanode_opts(); let instance = Arc::new(Instance::new(&opts, catalog_list).await.unwrap()); Extension(instance) diff --git a/src/datanode/src/sql.rs b/src/datanode/src/sql.rs index 98bce48f4e..10e45a5e12 100644 --- a/src/datanode/src/sql.rs +++ b/src/datanode/src/sql.rs @@ -1,8 +1,8 @@ //! sql handler mod insert; +use catalog::SchemaProviderRef; use common_error::ext::BoxedError; -use query::catalog::schema::SchemaProviderRef; use query::query_engine::Output; use snafu::{OptionExt, ResultExt}; use sql::statements::statement::Statement; @@ -58,15 +58,13 @@ mod tests { use std::any::Any; use std::sync::Arc; + use catalog::SchemaProvider; use common_query::logical_plan::Expr; use common_recordbatch::SendableRecordBatchStream; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; use datatypes::value::Value; use log_store::fs::noop::NoopLogStore; - use query::catalog::memory; - use query::catalog::schema::SchemaProvider; - use query::error::Result as QueryResult; use query::QueryEngineFactory; use storage::config::EngineConfig; use storage::EngineImpl; @@ -121,10 +119,14 @@ mod tests { Some(Arc::new(DemoTable {})) } - fn register_table(&self, _name: String, _table: TableRef) -> QueryResult> { + fn register_table( + &self, + _name: String, + _table: TableRef, + ) -> catalog::error::Result> { unimplemented!(); } - fn deregister_table(&self, _name: &str) -> QueryResult> { + fn deregister_table(&self, _name: &str) -> catalog::error::Result> { unimplemented!(); } fn table_exist(&self, name: &str) -> bool { @@ -137,7 +139,7 @@ mod tests { let dir = TempDir::new("setup_test_engine_and_table").unwrap(); let store_dir = dir.path().to_string_lossy(); - let catalog_list = memory::new_memory_catalog_list().unwrap(); + let catalog_list = catalog::memory::new_memory_catalog_list().unwrap(); let factory = QueryEngineFactory::new(catalog_list); let query_engine = factory.query_engine().clone(); diff --git a/src/datanode/src/sql/insert.rs b/src/datanode/src/sql/insert.rs index 4294c488f9..a186b3f487 100644 --- a/src/datanode/src/sql/insert.rs +++ b/src/datanode/src/sql/insert.rs @@ -1,9 +1,9 @@ use std::str::FromStr; +use catalog::SchemaProviderRef; use datatypes::prelude::ConcreteDataType; use datatypes::prelude::VectorBuilder; use datatypes::value::Value; -use query::catalog::schema::SchemaProviderRef; use query::query_engine::Output; use snafu::ensure; use snafu::OptionExt; diff --git a/src/datanode/src/tests/http_test.rs b/src/datanode/src/tests/http_test.rs index b24f4dbaf7..175aef21ea 100644 --- a/src/datanode/src/tests/http_test.rs +++ b/src/datanode/src/tests/http_test.rs @@ -5,14 +5,13 @@ use std::sync::Arc; use axum::http::StatusCode; use axum::Router; use axum_test_helper::TestClient; -use query::catalog::memory; use crate::instance::Instance; use crate::server::http::HttpServer; use crate::test_util; async fn make_test_app() -> Router { - let catalog_list = memory::new_memory_catalog_list().unwrap(); + let catalog_list = catalog::memory::new_memory_catalog_list().unwrap(); let (opts, _tmp_dir) = test_util::create_tmp_dir_and_datanode_opts(); let instance = Arc::new(Instance::new(&opts, catalog_list).await.unwrap()); let http_server = HttpServer::new(instance); diff --git a/src/query/Cargo.toml b/src/query/Cargo.toml index ada307bcee..ea4a24a20d 100644 --- a/src/query/Cargo.toml +++ b/src/query/Cargo.toml @@ -5,27 +5,28 @@ edition = "2021" [dependencies.arrow] package = "arrow2" -version="0.10" +version = "0.10" features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"] [dependencies] arc-swap = "1.0" async-trait = "0.1" +catalog = { path = "../catalog" } common-error = { path = "../common/error" } common-function = { path = "../common/function" } common-query = { path = "../common/query" } -common-recordbatch = {path = "../common/recordbatch" } +common-recordbatch = { path = "../common/recordbatch" } common-telemetry = { path = "../common/telemetry" } -datafusion = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2", features = ["simd"]} -datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2" } -datatypes = {path = "../datatypes" } +datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] } +datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" } +datatypes = { path = "../datatypes" } futures = "0.3" futures-util = "0.3" metrics = "0.18" snafu = { version = "0.7", features = ["backtraces"] } +sql = { path = "../sql" } table = { path = "../table" } tokio = "1.0" -sql = { path = "../sql" } [dev-dependencies] num = "0.4" diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index e976f1ad9b..5575e41cff 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -7,6 +7,7 @@ mod planner; use std::sync::Arc; +use catalog::CatalogListRef; use common_function::scalars::aggregate::AggregateFunctionMetaRef; use common_function::scalars::udf::create_udf; use common_function::scalars::FunctionRef; @@ -21,7 +22,6 @@ pub use crate::datafusion::catalog_adapter::DfCatalogListAdapter; use crate::metric; use crate::query_engine::{QueryContext, QueryEngineState}; use crate::{ - catalog::CatalogListRef, datafusion::plan_adapter::PhysicalPlanAdapter, datafusion::planner::{DfContextProviderAdapter, DfPlanner}, error::Result, @@ -213,11 +213,10 @@ mod tests { use datafusion::field_util::FieldExt; use datafusion::field_util::SchemaExt; - use crate::catalog::memory; use crate::query_engine::{Output, QueryEngineFactory, QueryEngineRef}; fn create_test_engine() -> QueryEngineRef { - let catalog_list = memory::new_memory_catalog_list().unwrap(); + let catalog_list = catalog::memory::new_memory_catalog_list().unwrap(); let factory = QueryEngineFactory::new(catalog_list); factory.query_engine().clone() } diff --git a/src/query/src/datafusion/catalog_adapter.rs b/src/query/src/datafusion/catalog_adapter.rs index 37994eb9ac..580ff783c6 100644 --- a/src/query/src/datafusion/catalog_adapter.rs +++ b/src/query/src/datafusion/catalog_adapter.rs @@ -3,6 +3,7 @@ use std::any::Any; use std::sync::Arc; +use catalog::{CatalogListRef, CatalogProvider, SchemaProvider}; use datafusion::catalog::{ catalog::{CatalogList as DfCatalogList, CatalogProvider as DfCatalogProvider}, schema::SchemaProvider as DfSchemaProvider, @@ -16,9 +17,7 @@ use table::{ TableRef, }; -use crate::catalog::{schema::SchemaProvider, CatalogListRef, CatalogProvider}; use crate::datafusion::error; -use crate::error::Result; pub struct DfCatalogListAdapter { runtime: Arc, @@ -169,7 +168,7 @@ impl DfSchemaProvider for DfSchemaProviderAdapter { } } -/// Datafuion SchemaProviderAdapter -> greptime SchemaProviderAdapter +/// Datafusion SchemaProviderAdapter -> greptime SchemaProviderAdapter struct SchemaProviderAdapter { df_schema_provider: Arc, runtime: Arc, @@ -202,7 +201,11 @@ impl SchemaProvider for SchemaProviderAdapter { }) } - fn register_table(&self, name: String, table: TableRef) -> Result> { + fn register_table( + &self, + name: String, + table: TableRef, + ) -> catalog::error::Result> { let table_provider = Arc::new(DfTableProviderAdapter::new(table.clone())); Ok(self .df_schema_provider @@ -213,7 +216,7 @@ impl SchemaProvider for SchemaProviderAdapter { .map(|_| table)) } - fn deregister_table(&self, name: &str) -> Result> { + fn deregister_table(&self, name: &str) -> catalog::error::Result> { self.df_schema_provider .deregister_table(name) .context(error::DatafusionSnafu { diff --git a/src/query/src/datafusion/error.rs b/src/query/src/datafusion/error.rs index c9a5293155..1d3eafd8a0 100644 --- a/src/query/src/datafusion/error.rs +++ b/src/query/src/datafusion/error.rs @@ -71,6 +71,12 @@ impl ErrorExt for InnerError { } } +impl From for catalog::error::Error { + fn from(e: InnerError) -> Self { + catalog::error::Error::new(e) + } +} + impl From for Error { fn from(err: InnerError) -> Self { Self::new(err) diff --git a/src/query/src/error.rs b/src/query/src/error.rs index 1253da97d2..d7d89bdbc2 100644 --- a/src/query/src/error.rs +++ b/src/query/src/error.rs @@ -9,3 +9,9 @@ impl From for DataFusionError { DataFusionError::External(Box::new(e)) } } + +impl From for Error { + fn from(e: catalog::error::Error) -> Self { + Error::new(e) + } +} diff --git a/src/query/src/lib.rs b/src/query/src/lib.rs index 8dae3b55f0..5af13ae1b3 100644 --- a/src/query/src/lib.rs +++ b/src/query/src/lib.rs @@ -1,4 +1,3 @@ -pub mod catalog; pub mod database; mod datafusion; pub mod error; diff --git a/src/query/src/query_engine.rs b/src/query/src/query_engine.rs index 9bd49893c2..7d0e0dd0a8 100644 --- a/src/query/src/query_engine.rs +++ b/src/query/src/query_engine.rs @@ -3,13 +3,13 @@ mod state; use std::sync::Arc; +use catalog::CatalogList; use common_function::scalars::aggregate::AggregateFunctionMetaRef; use common_function::scalars::{FunctionRef, FUNCTION_REGISTRY}; use common_query::prelude::ScalarUdf; use common_recordbatch::SendableRecordBatchStream; use sql::statements::statement::Statement; -use crate::catalog::CatalogList; use crate::datafusion::DatafusionQueryEngine; use crate::error::Result; use crate::plan::LogicalPlan; @@ -72,11 +72,10 @@ pub type QueryEngineRef = Arc; #[cfg(test)] mod tests { use super::*; - use crate::catalog::memory; #[test] fn test_query_engine_factory() { - let catalog_list = memory::new_memory_catalog_list().unwrap(); + let catalog_list = catalog::memory::new_memory_catalog_list().unwrap(); let factory = QueryEngineFactory::new(catalog_list); let engine = factory.query_engine(); diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index 6a086dd8cc..ce79c1c995 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -2,11 +2,11 @@ use std::collections::HashMap; use std::fmt; use std::sync::{Arc, RwLock}; +use catalog::CatalogListRef; use common_function::scalars::aggregate::AggregateFunctionMetaRef; use common_query::prelude::ScalarUdf; use datafusion::prelude::{ExecutionConfig, ExecutionContext}; -use crate::catalog::{self, CatalogListRef}; use crate::datafusion::DfCatalogListAdapter; use crate::executor::Runtime; diff --git a/src/query/tests/my_sum_udaf_example.rs b/src/query/tests/my_sum_udaf_example.rs index 9f3d93d789..79ca494c16 100644 --- a/src/query/tests/my_sum_udaf_example.rs +++ b/src/query/tests/my_sum_udaf_example.rs @@ -5,6 +5,8 @@ use std::sync::Arc; mod testing_table; use arc_swap::ArcSwapOption; +use catalog::memory::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider}; +use catalog::{CatalogList, SchemaProvider, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_function::scalars::aggregate::AggregateFunctionMeta; use common_query::error::CreateAccumulatorSnafu; use common_query::error::Result as QueryResult; @@ -20,9 +22,6 @@ use datatypes::types::PrimitiveType; use datatypes::vectors::PrimitiveVector; use datatypes::with_match_primitive_type_id; use num_traits::AsPrimitive; -use query::catalog::memory::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider}; -use query::catalog::schema::SchemaProvider; -use query::catalog::{CatalogList, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use query::error::Result; use query::query_engine::Output; use query::QueryEngineFactory; diff --git a/src/query/tests/query_engine_test.rs b/src/query/tests/query_engine_test.rs index aa42e3e883..253e4b80db 100644 --- a/src/query/tests/query_engine_test.rs +++ b/src/query/tests/query_engine_test.rs @@ -4,6 +4,8 @@ mod testing_table; use std::sync::Arc; use arrow::array::UInt32Array; +use catalog::memory::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider}; +use catalog::{CatalogList, SchemaProvider, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_query::prelude::{create_udf, make_scalar_function, Volatility}; use common_recordbatch::error::Result as RecordResult; use common_recordbatch::{util, RecordBatch}; @@ -15,9 +17,6 @@ use datatypes::prelude::*; use datatypes::types::DataTypeBuilder; use datatypes::vectors::PrimitiveVector; use num::NumCast; -use query::catalog::memory::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider}; -use query::catalog::schema::SchemaProvider; -use query::catalog::{memory, CatalogList, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use query::error::Result; use query::plan::LogicalPlan; use query::query_engine::{Output, QueryEngineFactory}; @@ -32,7 +31,7 @@ use crate::testing_table::TestingTable; #[tokio::test] async fn test_datafusion_query_engine() -> Result<()> { common_telemetry::init_default_ut_logging(); - let catalog_list = memory::new_memory_catalog_list()?; + let catalog_list = catalog::memory::new_memory_catalog_list()?; let factory = QueryEngineFactory::new(catalog_list); let engine = factory.query_engine(); @@ -77,7 +76,7 @@ async fn test_datafusion_query_engine() -> Result<()> { #[tokio::test] async fn test_udf() -> Result<()> { common_telemetry::init_default_ut_logging(); - let catalog_list = memory::new_memory_catalog_list()?; + let catalog_list = catalog::memory::new_memory_catalog_list()?; let factory = QueryEngineFactory::new(catalog_list); let engine = factory.query_engine();