move catalog-related traits and struct to a catalog crate (#134)

This commit is contained in:
Lei, Huang
2022-08-04 11:05:28 +08:00
committed by GitHub
parent 6db6106829
commit 7395920bc8
28 changed files with 121 additions and 56 deletions

14
Cargo.lock generated
View File

@@ -500,6 +500,18 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2698f953def977c68f935bb0dfa959375ad4638570e969e2f1e9f433cbf1af6"
[[package]]
name = "catalog"
version = "0.1.0"
dependencies = [
"common-error",
"common-telemetry",
"datafusion",
"snafu",
"table",
"tokio",
]
[[package]]
name = "cc"
version = "1.0.73"
@@ -1155,6 +1167,7 @@ dependencies = [
"axum",
"axum-macros",
"axum-test-helper",
"catalog",
"common-error",
"common-query",
"common-recordbatch",
@@ -2962,6 +2975,7 @@ dependencies = [
"arc-swap",
"arrow2",
"async-trait",
"catalog",
"common-error",
"common-function",
"common-query",

View File

@@ -1,6 +1,7 @@
[workspace]
members = [
"src/api",
"src/catalog",
"src/client",
"src/common/base",
"src/common/error",

16
src/catalog/Cargo.toml Normal file
View File

@@ -0,0 +1,16 @@
[package]
name = "catalog"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
common-error = { path = "../common/error" }
common-telemetry = { path = "../common/telemetry" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] }
snafu = { version = "0.7", features = ["backtraces"] }
table = { path = "../table" }
[dev-dependencies]
tokio = { version = "1.0", features = ["full"] }

View File

@@ -1,9 +1,7 @@
pub mod memory;
pub mod schema;
use std::any::Any;
use std::sync::Arc;
use crate::catalog::schema::SchemaProvider;
use crate::schema::SchemaProvider;
/// Represent a list of named catalogs
pub trait CatalogList: Sync + Send {

11
src/catalog/src/error.rs Normal file
View File

@@ -0,0 +1,11 @@
use datafusion::error::DataFusionError;
common_error::define_opaque_error!(Error);
pub type Result<T> = std::result::Result<T, Error>;
impl From<Error> for DataFusionError {
fn from(e: Error) -> DataFusionError {
DataFusionError::External(Box::new(e))
}
}

8
src/catalog/src/lib.rs Normal file
View File

@@ -0,0 +1,8 @@
mod catalog;
pub mod error;
pub mod memory;
mod schema;
pub use crate::catalog::{CatalogList, CatalogListRef, CatalogProvider, CatalogProviderRef};
pub use crate::catalog::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
pub use crate::schema::{SchemaProvider, SchemaProviderRef};

View File

@@ -7,12 +7,12 @@ use common_error::prelude::*;
use table::table::numbers::NumbersTable;
use table::TableRef;
use crate::catalog::schema::SchemaProvider;
use crate::catalog::{
CatalogList, CatalogListRef, CatalogProvider, CatalogProviderRef, DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
};
use crate::error::{Error, Result};
use crate::schema::SchemaProvider;
/// Error implementation of memory catalog.
#[derive(Debug, Snafu)]
@@ -191,7 +191,6 @@ pub fn new_memory_catalog_list() -> Result<CatalogListRef> {
#[cfg(test)]
mod tests {
use table::table::numbers::NumbersTable;
use super::*;

View File

@@ -10,6 +10,7 @@ api = { path = "../api" }
async-trait = "0.1"
axum = "0.5"
axum-macros = "0.2"
catalog = { path = "../catalog" }
common-error = { path = "../common/error" }
common-recordbatch = { path = "../common/recordbatch" }
common-telemetry = { path = "../common/telemetry" }

View File

@@ -1 +0,0 @@

View File

@@ -1,7 +1,6 @@
use std::sync::Arc;
use query::catalog::memory;
use query::catalog::CatalogListRef;
use catalog::CatalogListRef;
use snafu::ResultExt;
use crate::error::{NewCatalogSnafu, Result};
@@ -35,7 +34,7 @@ pub struct Datanode {
impl Datanode {
pub async fn new(opts: DatanodeOptions) -> Result<Datanode> {
let catalog_list = memory::new_memory_catalog_list().context(NewCatalogSnafu)?;
let catalog_list = catalog::memory::new_memory_catalog_list().context(NewCatalogSnafu)?;
let instance = Arc::new(Instance::new(&opts, catalog_list.clone()).await?);
Ok(Self {

View File

@@ -20,7 +20,7 @@ pub enum Error {
#[snafu(display("Fail to create catalog list, source: {}", source))]
NewCatalog {
#[snafu(backtrace)]
source: query::error::Error,
source: catalog::error::Error,
},
#[snafu(display("Fail to create table: {}, {}", table_name, source))]
@@ -115,7 +115,8 @@ pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::ExecuteSql { source } | Error::NewCatalog { source } => source.status_code(),
Error::ExecuteSql { source } => source.status_code(),
Error::NewCatalog { source } => source.status_code(),
Error::CreateTable { source, .. } => source.status_code(),
Error::GetTable { source, .. } => source.status_code(),
Error::Insert { source, .. } => source.status_code(),
@@ -164,6 +165,12 @@ mod tests {
)))
}
fn throw_catalog_error() -> std::result::Result<(), catalog::error::Error> {
Err(catalog::error::Error::new(MockError::with_backtrace(
StatusCode::Internal,
)))
}
fn assert_internal_error(err: &Error) {
assert!(err.backtrace_opt().is_some());
assert_eq!(StatusCode::Internal, err.status_code());
@@ -179,7 +186,10 @@ mod tests {
let err = throw_query_error().context(ExecuteSqlSnafu).err().unwrap();
assert_internal_error(&err);
assert_tonic_internal_error(err);
let err = throw_query_error().context(NewCatalogSnafu).err().unwrap();
let err = throw_catalog_error()
.context(NewCatalogSnafu)
.err()
.unwrap();
assert_internal_error(&err);
assert_tonic_internal_error(err);
}

View File

@@ -1,11 +1,11 @@
use std::{fs, path, sync::Arc};
use api::v1::InsertExpr;
use catalog::{CatalogListRef, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_telemetry::logging::info;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
use log_store::fs::{config::LogConfig, log::LocalFileLogStore};
use query::catalog::{CatalogListRef, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use query::query_engine::{Output, QueryEngineFactory, QueryEngineRef};
use snafu::{OptionExt, ResultExt};
use sql::statements::statement::Statement;
@@ -177,14 +177,13 @@ async fn create_local_file_log_store(opts: &DatanodeOptions) -> Result<LocalFile
mod tests {
use arrow::array::UInt64Array;
use common_recordbatch::util;
use query::catalog::memory;
use super::*;
use crate::test_util;
#[tokio::test]
async fn test_execute_insert() {
let catalog_list = memory::new_memory_catalog_list().unwrap();
let catalog_list = catalog::memory::new_memory_catalog_list().unwrap();
let (opts, _tmp_dir) = test_util::create_tmp_dir_and_datanode_opts();
let instance = Instance::new(&opts, catalog_list).await.unwrap();
instance.start().await.unwrap();
@@ -204,7 +203,7 @@ mod tests {
#[tokio::test]
async fn test_execute_query() {
let catalog_list = memory::new_memory_catalog_list().unwrap();
let catalog_list = catalog::memory::new_memory_catalog_list().unwrap();
let (opts, _tmp_dir) = test_util::create_tmp_dir_and_datanode_opts();
let instance = Instance::new(&opts, catalog_list).await.unwrap();

View File

@@ -1,4 +1,3 @@
pub mod catalog;
pub mod datanode;
pub mod error;
pub mod instance;

View File

@@ -43,7 +43,6 @@ mod tests {
use std::sync::Arc;
use metrics::counter;
use query::catalog::memory;
use super::*;
use crate::instance::Instance;
@@ -60,7 +59,7 @@ mod tests {
}
async fn create_extension() -> Extension<InstanceRef> {
let catalog_list = memory::new_memory_catalog_list().unwrap();
let catalog_list = catalog::memory::new_memory_catalog_list().unwrap();
let (opts, _tmp_dir) = test_util::create_tmp_dir_and_datanode_opts();
let instance = Arc::new(Instance::new(&opts, catalog_list).await.unwrap());
Extension(instance)

View File

@@ -1,8 +1,8 @@
//! sql handler
mod insert;
use catalog::SchemaProviderRef;
use common_error::ext::BoxedError;
use query::catalog::schema::SchemaProviderRef;
use query::query_engine::Output;
use snafu::{OptionExt, ResultExt};
use sql::statements::statement::Statement;
@@ -58,15 +58,13 @@ mod tests {
use std::any::Any;
use std::sync::Arc;
use catalog::SchemaProvider;
use common_query::logical_plan::Expr;
use common_recordbatch::SendableRecordBatchStream;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::value::Value;
use log_store::fs::noop::NoopLogStore;
use query::catalog::memory;
use query::catalog::schema::SchemaProvider;
use query::error::Result as QueryResult;
use query::QueryEngineFactory;
use storage::config::EngineConfig;
use storage::EngineImpl;
@@ -121,10 +119,14 @@ mod tests {
Some(Arc::new(DemoTable {}))
}
fn register_table(&self, _name: String, _table: TableRef) -> QueryResult<Option<TableRef>> {
fn register_table(
&self,
_name: String,
_table: TableRef,
) -> catalog::error::Result<Option<TableRef>> {
unimplemented!();
}
fn deregister_table(&self, _name: &str) -> QueryResult<Option<TableRef>> {
fn deregister_table(&self, _name: &str) -> catalog::error::Result<Option<TableRef>> {
unimplemented!();
}
fn table_exist(&self, name: &str) -> bool {
@@ -137,7 +139,7 @@ mod tests {
let dir = TempDir::new("setup_test_engine_and_table").unwrap();
let store_dir = dir.path().to_string_lossy();
let catalog_list = memory::new_memory_catalog_list().unwrap();
let catalog_list = catalog::memory::new_memory_catalog_list().unwrap();
let factory = QueryEngineFactory::new(catalog_list);
let query_engine = factory.query_engine().clone();

View File

@@ -1,9 +1,9 @@
use std::str::FromStr;
use catalog::SchemaProviderRef;
use datatypes::prelude::ConcreteDataType;
use datatypes::prelude::VectorBuilder;
use datatypes::value::Value;
use query::catalog::schema::SchemaProviderRef;
use query::query_engine::Output;
use snafu::ensure;
use snafu::OptionExt;

View File

@@ -5,14 +5,13 @@ use std::sync::Arc;
use axum::http::StatusCode;
use axum::Router;
use axum_test_helper::TestClient;
use query::catalog::memory;
use crate::instance::Instance;
use crate::server::http::HttpServer;
use crate::test_util;
async fn make_test_app() -> Router {
let catalog_list = memory::new_memory_catalog_list().unwrap();
let catalog_list = catalog::memory::new_memory_catalog_list().unwrap();
let (opts, _tmp_dir) = test_util::create_tmp_dir_and_datanode_opts();
let instance = Arc::new(Instance::new(&opts, catalog_list).await.unwrap());
let http_server = HttpServer::new(instance);

View File

@@ -5,27 +5,28 @@ edition = "2021"
[dependencies.arrow]
package = "arrow2"
version="0.10"
version = "0.10"
features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"]
[dependencies]
arc-swap = "1.0"
async-trait = "0.1"
catalog = { path = "../catalog" }
common-error = { path = "../common/error" }
common-function = { path = "../common/function" }
common-query = { path = "../common/query" }
common-recordbatch = {path = "../common/recordbatch" }
common-recordbatch = { path = "../common/recordbatch" }
common-telemetry = { path = "../common/telemetry" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2", features = ["simd"]}
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2" }
datatypes = {path = "../datatypes" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" }
datatypes = { path = "../datatypes" }
futures = "0.3"
futures-util = "0.3"
metrics = "0.18"
snafu = { version = "0.7", features = ["backtraces"] }
sql = { path = "../sql" }
table = { path = "../table" }
tokio = "1.0"
sql = { path = "../sql" }
[dev-dependencies]
num = "0.4"

View File

@@ -7,6 +7,7 @@ mod planner;
use std::sync::Arc;
use catalog::CatalogListRef;
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_function::scalars::udf::create_udf;
use common_function::scalars::FunctionRef;
@@ -21,7 +22,6 @@ pub use crate::datafusion::catalog_adapter::DfCatalogListAdapter;
use crate::metric;
use crate::query_engine::{QueryContext, QueryEngineState};
use crate::{
catalog::CatalogListRef,
datafusion::plan_adapter::PhysicalPlanAdapter,
datafusion::planner::{DfContextProviderAdapter, DfPlanner},
error::Result,
@@ -213,11 +213,10 @@ mod tests {
use datafusion::field_util::FieldExt;
use datafusion::field_util::SchemaExt;
use crate::catalog::memory;
use crate::query_engine::{Output, QueryEngineFactory, QueryEngineRef};
fn create_test_engine() -> QueryEngineRef {
let catalog_list = memory::new_memory_catalog_list().unwrap();
let catalog_list = catalog::memory::new_memory_catalog_list().unwrap();
let factory = QueryEngineFactory::new(catalog_list);
factory.query_engine().clone()
}

View File

@@ -3,6 +3,7 @@
use std::any::Any;
use std::sync::Arc;
use catalog::{CatalogListRef, CatalogProvider, SchemaProvider};
use datafusion::catalog::{
catalog::{CatalogList as DfCatalogList, CatalogProvider as DfCatalogProvider},
schema::SchemaProvider as DfSchemaProvider,
@@ -16,9 +17,7 @@ use table::{
TableRef,
};
use crate::catalog::{schema::SchemaProvider, CatalogListRef, CatalogProvider};
use crate::datafusion::error;
use crate::error::Result;
pub struct DfCatalogListAdapter {
runtime: Arc<RuntimeEnv>,
@@ -169,7 +168,7 @@ impl DfSchemaProvider for DfSchemaProviderAdapter {
}
}
/// Datafuion SchemaProviderAdapter -> greptime SchemaProviderAdapter
/// Datafusion SchemaProviderAdapter -> greptime SchemaProviderAdapter
struct SchemaProviderAdapter {
df_schema_provider: Arc<dyn DfSchemaProvider>,
runtime: Arc<RuntimeEnv>,
@@ -202,7 +201,11 @@ impl SchemaProvider for SchemaProviderAdapter {
})
}
fn register_table(&self, name: String, table: TableRef) -> Result<Option<TableRef>> {
fn register_table(
&self,
name: String,
table: TableRef,
) -> catalog::error::Result<Option<TableRef>> {
let table_provider = Arc::new(DfTableProviderAdapter::new(table.clone()));
Ok(self
.df_schema_provider
@@ -213,7 +216,7 @@ impl SchemaProvider for SchemaProviderAdapter {
.map(|_| table))
}
fn deregister_table(&self, name: &str) -> Result<Option<TableRef>> {
fn deregister_table(&self, name: &str) -> catalog::error::Result<Option<TableRef>> {
self.df_schema_provider
.deregister_table(name)
.context(error::DatafusionSnafu {

View File

@@ -71,6 +71,12 @@ impl ErrorExt for InnerError {
}
}
impl From<InnerError> for catalog::error::Error {
fn from(e: InnerError) -> Self {
catalog::error::Error::new(e)
}
}
impl From<InnerError> for Error {
fn from(err: InnerError) -> Self {
Self::new(err)

View File

@@ -9,3 +9,9 @@ impl From<Error> for DataFusionError {
DataFusionError::External(Box::new(e))
}
}
impl From<catalog::error::Error> for Error {
fn from(e: catalog::error::Error) -> Self {
Error::new(e)
}
}

View File

@@ -1,4 +1,3 @@
pub mod catalog;
pub mod database;
mod datafusion;
pub mod error;

View File

@@ -3,13 +3,13 @@ mod state;
use std::sync::Arc;
use catalog::CatalogList;
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_function::scalars::{FunctionRef, FUNCTION_REGISTRY};
use common_query::prelude::ScalarUdf;
use common_recordbatch::SendableRecordBatchStream;
use sql::statements::statement::Statement;
use crate::catalog::CatalogList;
use crate::datafusion::DatafusionQueryEngine;
use crate::error::Result;
use crate::plan::LogicalPlan;
@@ -72,11 +72,10 @@ pub type QueryEngineRef = Arc<dyn QueryEngine>;
#[cfg(test)]
mod tests {
use super::*;
use crate::catalog::memory;
#[test]
fn test_query_engine_factory() {
let catalog_list = memory::new_memory_catalog_list().unwrap();
let catalog_list = catalog::memory::new_memory_catalog_list().unwrap();
let factory = QueryEngineFactory::new(catalog_list);
let engine = factory.query_engine();

View File

@@ -2,11 +2,11 @@ use std::collections::HashMap;
use std::fmt;
use std::sync::{Arc, RwLock};
use catalog::CatalogListRef;
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_query::prelude::ScalarUdf;
use datafusion::prelude::{ExecutionConfig, ExecutionContext};
use crate::catalog::{self, CatalogListRef};
use crate::datafusion::DfCatalogListAdapter;
use crate::executor::Runtime;

View File

@@ -5,6 +5,8 @@ use std::sync::Arc;
mod testing_table;
use arc_swap::ArcSwapOption;
use catalog::memory::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider};
use catalog::{CatalogList, SchemaProvider, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_function::scalars::aggregate::AggregateFunctionMeta;
use common_query::error::CreateAccumulatorSnafu;
use common_query::error::Result as QueryResult;
@@ -20,9 +22,6 @@ use datatypes::types::PrimitiveType;
use datatypes::vectors::PrimitiveVector;
use datatypes::with_match_primitive_type_id;
use num_traits::AsPrimitive;
use query::catalog::memory::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider};
use query::catalog::schema::SchemaProvider;
use query::catalog::{CatalogList, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use query::error::Result;
use query::query_engine::Output;
use query::QueryEngineFactory;

View File

@@ -4,6 +4,8 @@ mod testing_table;
use std::sync::Arc;
use arrow::array::UInt32Array;
use catalog::memory::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider};
use catalog::{CatalogList, SchemaProvider, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::prelude::{create_udf, make_scalar_function, Volatility};
use common_recordbatch::error::Result as RecordResult;
use common_recordbatch::{util, RecordBatch};
@@ -15,9 +17,6 @@ use datatypes::prelude::*;
use datatypes::types::DataTypeBuilder;
use datatypes::vectors::PrimitiveVector;
use num::NumCast;
use query::catalog::memory::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider};
use query::catalog::schema::SchemaProvider;
use query::catalog::{memory, CatalogList, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use query::error::Result;
use query::plan::LogicalPlan;
use query::query_engine::{Output, QueryEngineFactory};
@@ -32,7 +31,7 @@ use crate::testing_table::TestingTable;
#[tokio::test]
async fn test_datafusion_query_engine() -> Result<()> {
common_telemetry::init_default_ut_logging();
let catalog_list = memory::new_memory_catalog_list()?;
let catalog_list = catalog::memory::new_memory_catalog_list()?;
let factory = QueryEngineFactory::new(catalog_list);
let engine = factory.query_engine();
@@ -77,7 +76,7 @@ async fn test_datafusion_query_engine() -> Result<()> {
#[tokio::test]
async fn test_udf() -> Result<()> {
common_telemetry::init_default_ut_logging();
let catalog_list = memory::new_memory_catalog_list()?;
let catalog_list = catalog::memory::new_memory_catalog_list()?;
let factory = QueryEngineFactory::new(catalog_list);
let engine = factory.query_engine();