refactor: move test-util subcrate into table (#334)

* refactor: move test-util subcrate into table

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* chore: clean comment

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* move MockTableEngine into test-util

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2022-10-21 14:39:40 +08:00
committed by GitHub
parent 6b0c5281d4
commit bc9a2df9bf
23 changed files with 201 additions and 146 deletions

19
Cargo.lock generated
View File

@@ -3712,7 +3712,6 @@ dependencies = [
"stats-cli",
"streaming-stats",
"table",
"test-util",
"tokio",
"tokio-stream",
]
@@ -4574,7 +4573,7 @@ dependencies = [
"serde",
"serde_json",
"snafu",
"test-util",
"table",
"tokio",
"tokio-postgres",
"tokio-stream",
@@ -5119,22 +5118,6 @@ dependencies = [
"winapi",
]
[[package]]
name = "test-util"
version = "0.1.0"
dependencies = [
"arrow2",
"async-trait",
"common-query",
"common-recordbatch",
"datafusion",
"datatypes",
"futures",
"snafu",
"table",
"tokio",
]
[[package]]
name = "textwrap"
version = "0.11.0"

View File

@@ -30,5 +30,4 @@ members = [
"src/store-api",
"src/table",
"src/table-engine",
"test-util",
]

View File

@@ -12,7 +12,9 @@ common-query = { path = "../common/query" }
common-recordbatch = { path = "../common/recordbatch" }
common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = [
"simd",
] }
datatypes = { path = "../datatypes" }
futures = "0.3"
futures-util = "0.3"

View File

@@ -16,7 +16,7 @@ pub mod error;
mod manager;
pub mod memory;
pub mod schema;
mod system;
pub mod system;
pub mod tables;
/// Represent a list of named catalogs

View File

@@ -328,18 +328,12 @@ pub struct TableEntryValue {
#[cfg(test)]
mod tests {
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::field_util::SchemaExt;
use datatypes::arrow;
use log_store::fs::noop::NoopLogStore;
use object_store::ObjectStore;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
use table::engine::TableEngine;
use table::metadata::TableType;
use table::metadata::TableType::Base;
use table::requests::{AlterTableRequest, DropTableRequest};
use table::table::adapter::TableAdapter;
use table_engine::config::EngineConfig;
use table_engine::engine::MitoEngine;
use tempdir::TempDir;
@@ -416,84 +410,7 @@ mod tests {
assert!(EntryType::try_from(4).is_err());
}
struct MockTableEngine {
table_name: String,
sole_table: TableRef,
}
impl Default for MockTableEngine {
fn default() -> Self {
Self {
table_name: SYSTEM_CATALOG_TABLE_NAME.to_string(),
sole_table: Arc::new(
TableAdapter::new(
Arc::new(datafusion::datasource::empty::EmptyTable::new(Arc::new(
arrow::datatypes::Schema::empty(),
))),
Arc::new(RuntimeEnv::default()),
)
.unwrap(),
),
}
}
}
#[async_trait::async_trait]
impl TableEngine for MockTableEngine {
fn name(&self) -> &str {
"MockTableEngine"
}
async fn create_table(
&self,
_ctx: &EngineContext,
_request: CreateTableRequest,
) -> table::Result<TableRef> {
unreachable!()
}
async fn open_table(
&self,
_ctx: &EngineContext,
request: OpenTableRequest,
) -> table::Result<Option<TableRef>> {
if request.table_name == self.table_name {
Ok(Some(self.sole_table.clone()))
} else {
Ok(None)
}
}
async fn alter_table(
&self,
_ctx: &EngineContext,
_request: AlterTableRequest,
) -> table::Result<TableRef> {
unreachable!()
}
fn get_table(&self, _ctx: &EngineContext, name: &str) -> table::Result<Option<TableRef>> {
if name == self.table_name {
Ok(Some(self.sole_table.clone()))
} else {
Ok(None)
}
}
fn table_exists(&self, _ctx: &EngineContext, name: &str) -> bool {
name == self.table_name
}
async fn drop_table(
&self,
_ctx: &EngineContext,
_request: DropTableRequest,
) -> table::Result<()> {
unreachable!()
}
}
async fn prepare_table_engine() -> (TempDir, TableEngineRef) {
pub async fn prepare_table_engine() -> (TempDir, TableEngineRef) {
let dir = TempDir::new("system-table-test").unwrap();
let store_dir = dir.path().to_string_lossy();
let accessor = opendal::services::fs::Builder::default()

View File

@@ -13,7 +13,9 @@ common-query = { path = "../common/query" }
common-recordbatch = { path = "../common/recordbatch" }
common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = [
"simd",
] }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" }
datatypes = { path = "../datatypes" }
@@ -30,7 +32,16 @@ tokio = "1.0"
[dependencies.arrow]
package = "arrow2"
version = "0.10"
features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"]
features = [
"io_csv",
"io_json",
"io_parquet",
"io_parquet_compression",
"io_ipc",
"ahash",
"compute",
"serde_types",
]
[dev-dependencies]
approx_eq = "0.1"
@@ -43,7 +54,5 @@ rand = "0.8"
statrs = "0.15"
stats-cli = "3.0"
streaming-stats = "0.2"
test-util = { path = "../../test-util" }
tokio = { version = "1.0", features = ["full"] }
tokio-stream = "0.1"

View File

@@ -14,7 +14,7 @@ use datatypes::vectors::PrimitiveVector;
use query::query_engine::QueryEngineFactory;
use query::QueryEngine;
use rand::Rng;
use test_util::MemTable;
use table::test_util::MemTable;
pub fn create_query_engine() -> Arc<dyn QueryEngine> {
let schema_provider = Arc::new(MemorySchemaProvider::new());

View File

@@ -26,7 +26,7 @@ use datatypes::with_match_primitive_type_id;
use num_traits::AsPrimitive;
use query::error::Result;
use query::QueryEngineFactory;
use test_util::MemTable;
use table::test_util::MemTable;
#[derive(Debug, Default)]
struct MySumAccumulator<T, SumT>

View File

@@ -18,7 +18,7 @@ use function::{create_query_engine, get_numbers_from_table};
use num_traits::AsPrimitive;
use query::error::Result;
use query::{QueryEngine, QueryEngineFactory};
use test_util::MemTable;
use table::test_util::MemTable;
#[tokio::test]
async fn test_percentile_aggregator() -> Result<()> {

View File

@@ -27,7 +27,7 @@ use query::QueryEngine;
use rand::Rng;
use table::table::adapter::DfTableProviderAdapter;
use table::table::numbers::NumbersTable;
use test_util::MemTable;
use table::test_util::MemTable;
use crate::pow::pow;

View File

@@ -43,6 +43,6 @@ mysql_async = { git = "https://github.com/Morranto/mysql_async.git", rev = "127b
rand = "0.8"
script = { path = "../script", features = ["python"] }
query = { path = "../query" }
test-util = { path = "../../test-util" }
table = { path = "../table" }
tokio-postgres = "0.7"
tokio-test = "0.4"

View File

@@ -6,7 +6,7 @@ use metrics::counter;
use servers::http::handler as http_handler;
use servers::http::handler::ScriptExecution;
use servers::http::{HttpResponse, JsonOutput};
use test_util::MemTable;
use table::test_util::MemTable;
use crate::create_testing_sql_query_handler;

View File

@@ -10,7 +10,7 @@ use common_query::Output;
use query::{QueryEngineFactory, QueryEngineRef};
use servers::error::Result;
use servers::query_handler::{SqlQueryHandler, SqlQueryHandlerRef};
use test_util::MemTable;
use table::test_util::MemTable;
mod http;
mod mysql;

View File

@@ -11,7 +11,7 @@ use rand::Rng;
use servers::error::Result;
use servers::mysql::server::MysqlServer;
use servers::server::Server;
use test_util::MemTable;
use table::test_util::MemTable;
use crate::create_testing_sql_query_handler;
use crate::mysql::{all_datatype_testing_data, MysqlTextRow, TestingData};

View File

@@ -8,7 +8,7 @@ use rand::Rng;
use servers::error::Result;
use servers::postgres::PostgresServer;
use servers::server::Server;
use test_util::MemTable;
use table::test_util::MemTable;
use tokio_postgres::{Client, Error as PgError, NoTls, SimpleQueryMessage};
use crate::create_testing_sql_query_handler;

View File

@@ -10,7 +10,9 @@ common-error = { path = "../common/error" }
common-query = { path = "../common/query" }
common-recordbatch = { path = "../common/recordbatch" }
common-telemetry = { path = "../common/telemetry" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = [
"simd",
] }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" }
datatypes = { path = "../datatypes" }
derive_builder = "0.11"
@@ -20,9 +22,9 @@ paste = "1.0"
serde = "1.0.136"
snafu = { version = "0.7", features = ["backtraces"] }
store-api = { path = "../store-api" }
tokio = { version = "1.18", features = ["full"] }
[dev-dependencies]
datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" }
tempdir = "0.3"
tokio = { version = "1.18", features = ["full"] }
tokio-util = { version = "0.7", features = ["compat"] }

View File

@@ -4,6 +4,7 @@ pub mod metadata;
pub mod predicate;
pub mod requests;
pub mod table;
pub mod test_util;
pub use crate::error::{Error, Result};
pub use crate::table::{Table, TableRef};

View File

@@ -0,0 +1,7 @@
mod empty_table;
mod memtable;
mod mock_engine;
pub use empty_table::EmptyTable;
pub use memtable::MemTable;
pub use mock_engine::MockTableEngine;

View File

@@ -0,0 +1,70 @@
use std::sync::Arc;
use async_trait::async_trait;
use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream};
use crate::metadata::TableInfoBuilder;
use crate::metadata::TableInfoRef;
use crate::requests::InsertRequest;
use crate::Result;
use crate::{
metadata::{TableMetaBuilder, TableType},
requests::CreateTableRequest,
Table,
};
pub struct EmptyTable {
info: TableInfoRef,
}
impl EmptyTable {
pub fn new(req: CreateTableRequest) -> Self {
let table_meta = TableMetaBuilder::default()
.schema(req.schema)
.primary_key_indices(req.primary_key_indices)
.next_column_id(0)
.options(req.table_options)
.build();
let table_info = TableInfoBuilder::default()
.catalog_name(req.catalog_name)
.schema_name(req.schema_name)
.name(req.table_name)
.meta(table_meta.unwrap())
.table_type(TableType::Temporary)
.desc(req.desc)
.build()
.unwrap();
Self {
info: Arc::new(table_info),
}
}
}
#[async_trait]
impl Table for EmptyTable {
fn as_any(&self) -> &dyn std::any::Any {
self as _
}
fn schema(&self) -> datatypes::schema::SchemaRef {
self.info.meta.schema.clone()
}
fn table_info(&self) -> TableInfoRef {
self.info.clone()
}
async fn insert(&self, _request: InsertRequest) -> Result<usize> {
Ok(0)
}
async fn scan(
&self,
_projection: &Option<Vec<usize>>,
_filters: &[common_query::prelude::Expr],
_limit: Option<usize>,
) -> Result<SendableRecordBatchStream> {
Ok(Box::pin(EmptyRecordBatchStream::new(self.schema())))
}
}

View File

@@ -12,9 +12,10 @@ use datatypes::vectors::UInt32Vector;
use futures::task::{Context, Poll};
use futures::Stream;
use snafu::prelude::*;
use table::error::{Result, SchemaConversionSnafu, TableProjectionSnafu};
use table::metadata::TableInfoRef;
use table::Table;
use crate::error::{Result, SchemaConversionSnafu, TableProjectionSnafu};
use crate::metadata::TableInfoRef;
use crate::Table;
#[derive(Debug, Clone)]
pub struct MemTable {

View File

@@ -0,0 +1,87 @@
use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use tokio::sync::Mutex;
use crate::test_util::EmptyTable;
use crate::{
engine::{EngineContext, TableEngine},
requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest},
Result, TableRef,
};
#[derive(Default)]
pub struct MockTableEngine {
tables: Mutex<HashMap<(String, String, String), TableRef>>,
}
impl MockTableEngine {
pub fn new() -> Self {
Self::default()
}
}
#[async_trait]
impl TableEngine for MockTableEngine {
fn name(&self) -> &str {
"MockTableEngine"
}
async fn create_table(
&self,
_ctx: &EngineContext,
request: CreateTableRequest,
) -> Result<TableRef> {
let catalog_name = request.catalog_name.clone();
let schema_name = request.schema_name.clone();
let table_name = request.table_name.clone();
let table_ref = Arc::new(EmptyTable::new(request));
self.tables
.lock()
.await
.insert((catalog_name, schema_name, table_name), table_ref.clone());
Ok(table_ref)
}
async fn open_table(
&self,
_ctx: &EngineContext,
request: OpenTableRequest,
) -> Result<Option<TableRef>> {
let catalog_name = request.catalog_name;
let schema_name = request.schema_name;
let table_name = request.table_name;
let res = self
.tables
.lock()
.await
.get(&(catalog_name, schema_name, table_name))
.cloned();
Ok(res)
}
async fn alter_table(
&self,
_ctx: &EngineContext,
_request: AlterTableRequest,
) -> Result<TableRef> {
unimplemented!()
}
fn get_table(&self, _ctx: &EngineContext, _name: &str) -> Result<Option<TableRef>> {
unimplemented!()
}
fn table_exists(&self, _ctx: &EngineContext, _name: &str) -> bool {
unimplemented!()
}
async fn drop_table(&self, _ctx: &EngineContext, _request: DropTableRequest) -> Result<()> {
unimplemented!()
}
}

View File

@@ -1,20 +0,0 @@
[package]
name = "test-util"
version = "0.1.0"
edition = "2021"
[dependencies]
async-trait = "0.1"
common-query = { path = "../src/common/query" }
common-recordbatch = { path = "../src/common/recordbatch" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] }
datatypes = { path = "../src/datatypes" }
futures = "0.3"
snafu = { version = "0.7", features = ["backtraces"] }
table = { path = "../src/table" }
tokio = { version = "1.20", features = ["full"] }
[dependencies.arrow]
package = "arrow2"
version = "0.10"
features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"]

View File

@@ -1,3 +0,0 @@
mod memtable;
pub use memtable::MemTable;