From e78c015fc0bd45bf8cb47921321ec411d0d724a0 Mon Sep 17 00:00:00 2001 From: dennis zhuang Date: Fri, 17 Jun 2022 11:36:49 +0800 Subject: [PATCH] TableEngine and SqlHandler impl (#45) * Impl TableEngine, bridge to storage * Impl sql handler to process insert sql * fix: minor changes and typo * test: add datanode test * test: add table-engine test * fix: code style * refactor: split out insert mod from sql and minor changes by CR * refactor: replace with_context with context --- Cargo.lock | 25 ++ Cargo.toml | 1 + src/common/error/src/status_code.rs | 2 + src/common/query/src/logical_plan/expr.rs | 24 +- src/datanode/Cargo.toml | 7 + src/datanode/src/datanode.rs | 5 +- src/datanode/src/error.rs | 63 +++++ src/datanode/src/instance.rs | 117 ++++++++- src/datanode/src/lib.rs | 1 + src/datanode/src/sql.rs | 179 +++++++++++++ src/datanode/src/sql/insert.rs | 263 +++++++++++++++++++ src/datatypes/src/data_type.rs | 8 + src/datatypes/src/prelude.rs | 4 +- src/query/src/datafusion.rs | 21 +- src/query/src/datafusion/planner.rs | 4 +- src/query/src/query_engine.rs | 5 + src/sql/src/ast.rs | 3 +- src/sql/src/statements/insert.rs | 46 +++- src/storage/src/test_util/descriptor_util.rs | 1 + src/store-api/src/storage.rs | 6 +- src/store-api/src/storage/descriptors.rs | 15 ++ src/store-api/src/storage/engine.rs | 2 +- src/store-api/src/storage/region.rs | 4 +- src/store-api/src/storage/requests.rs | 2 +- src/table-engine/Cargo.toml | 20 ++ src/table-engine/src/engine.rs | 251 ++++++++++++++++++ src/table-engine/src/error.rs | 35 +++ src/table-engine/src/lib.rs | 3 + src/table-engine/src/table.rs | 94 +++++++ src/table/src/engine.rs | 44 +++- src/table/src/error.rs | 3 + src/table/src/lib.rs | 4 +- src/table/src/metadata.rs | 168 ++++++++++++ src/table/src/requests.rs | 24 ++ src/table/src/table.rs | 70 +---- src/table/src/table/adapter.rs | 24 +- 36 files changed, 1438 insertions(+), 110 deletions(-) create mode 100644 src/datanode/src/sql.rs create mode 100644 src/datanode/src/sql/insert.rs create mode 100644 src/table-engine/Cargo.toml create mode 100644 src/table-engine/src/engine.rs create mode 100644 src/table-engine/src/error.rs create mode 100644 src/table-engine/src/lib.rs create mode 100644 src/table-engine/src/table.rs create mode 100644 src/table/src/metadata.rs create mode 100644 src/table/src/requests.rs diff --git a/Cargo.lock b/Cargo.lock index 5fadce9985..55efad4511 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -914,19 +914,26 @@ name = "datanode" version = "0.1.0" dependencies = [ "arrow2", + "async-trait", "axum", "axum-macros", "axum-test-helper", "common-error", + "common-query", "common-recordbatch", "common-telemetry", + "datatypes", "hyper", "metrics", "query", "serde", "serde_json", "snafu", + "sql", + "storage", + "store-api", "table", + "table-engine", "tokio", "tower", "tower-http", @@ -3134,6 +3141,24 @@ dependencies = [ "snafu", ] +[[package]] +name = "table-engine" +version = "0.1.0" +dependencies = [ + "async-trait", + "chrono", + "common-error", + "common-query", + "common-recordbatch", + "common-telemetry", + "datatypes", + "snafu", + "storage", + "store-api", + "table", + "tokio", +] + [[package]] name = "tempdir" version = "0.3.7" diff --git a/Cargo.toml b/Cargo.toml index ba7660ec76..a2d48d13d7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,4 +17,5 @@ members = [ "src/storage", "src/store-api", "src/table", + "src/table-engine", ] diff --git a/src/common/error/src/status_code.rs b/src/common/error/src/status_code.rs index d5a5745dfa..b2b4422ab4 100644 --- a/src/common/error/src/status_code.rs +++ b/src/common/error/src/status_code.rs @@ -31,6 +31,8 @@ pub enum StatusCode { // ====== Begin of catalog related status code ===== /// Table already exists. TableAlreadyExists, + TableNotFound, + TableColumnNotFound, // ====== End of catalog related status code ======= } diff --git a/src/common/query/src/logical_plan/expr.rs b/src/common/query/src/logical_plan/expr.rs index 1a55d1d350..b190e095ee 100644 --- a/src/common/query/src/logical_plan/expr.rs +++ b/src/common/query/src/logical_plan/expr.rs @@ -8,11 +8,27 @@ pub struct Expr { } impl Expr { - pub fn new(df_expr: DfExpr) -> Self { - Self { df_expr } - } - pub fn df_expr(&self) -> &DfExpr { &self.df_expr } } + +impl From for Expr { + fn from(df_expr: DfExpr) -> Self { + Self { df_expr } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_from_df_expr() { + let df_expr = DfExpr::Wildcard; + + let expr: Expr = df_expr.into(); + + assert_eq!(DfExpr::Wildcard, *expr.df_expr()); + } +} diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index 541314eb2f..11419444d2 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -6,24 +6,31 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +async-trait = "0.1" axum = "0.5" axum-macros = "0.2" common-error = { path = "../common/error" } common-recordbatch = { path = "../common/recordbatch" } common-telemetry = { path = "../common/telemetry" } +datatypes = { path = "../datatypes"} hyper = { version = "0.14", features = ["full"] } metrics = "0.18" query = { path = "../query" } serde = "1.0" serde_json = "1.0" snafu = { version = "0.7", features = ["backtraces"] } +sql = { path = "../sql" } +storage = { path = "../storage" } +store-api = { path = "../store-api" } table = { path = "../table" } +table-engine = { path = "../table-engine" } tokio = { version = "1.18", features = ["full"] } tower = { version = "0.4", features = ["full"]} tower-http = { version ="0.3", features = ["full"]} [dev-dependencies] axum-test-helper = "0.1" +common-query = { path = "../common/query" } [dev-dependencies.arrow] package = "arrow2" diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index ab1fef2967..77247636fa 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -18,7 +18,7 @@ pub struct Datanode { opts: DatanodeOptions, services: Services, _catalog_list: CatalogListRef, - _instance: InstanceRef, + instance: InstanceRef, } impl Datanode { @@ -30,11 +30,12 @@ impl Datanode { opts, services: Services::new(instance.clone()), _catalog_list: catalog_list, - _instance: instance, + instance, }) } pub async fn start(&self) -> Result<()> { + self.instance.start().await?; self.services.start(&self.opts).await } } diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 6d97d0bc23..0a69691185 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -1,6 +1,12 @@ use std::any::Any; use common_error::prelude::*; +use datatypes::prelude::ConcreteDataType; +use table::error::Error as TableError; +use table_engine::error::Error as TableEngineError; + +// TODO(boyan): use ErrorExt instead. +pub type BoxedError = Box; /// Business error of datanode. #[derive(Debug, Snafu)] @@ -18,6 +24,55 @@ pub enum Error { source: query::error::Error, }, + #[snafu(display("Fail to create table: {}, {}", table_name, source))] + CreateTable { + table_name: String, + source: TableEngineError, + }, + + #[snafu(display("Fail to get table: {}, {}", table_name, source))] + GetTable { + table_name: String, + source: BoxedError, + }, + + #[snafu(display("Table not found: {}", table_name))] + TableNotFound { table_name: String }, + + #[snafu(display("Column {} not found in table {}", column_name, table_name))] + ColumnNotFound { + column_name: String, + table_name: String, + }, + + #[snafu(display( + "Columns and values number mismatch, columns: {}, values: {}", + columns, + values + ))] + ColumnValuesNumberMismatch { columns: usize, values: usize }, + + #[snafu(display("Fail to parse value: {}, {}", msg, backtrace))] + ParseSqlValue { msg: String, backtrace: Backtrace }, + + #[snafu(display( + "Column {} expect type: {:?}, actual: {:?}", + column_name, + expect, + actual + ))] + ColumnTypeMismatch { + column_name: String, + expect: ConcreteDataType, + actual: ConcreteDataType, + }, + + #[snafu(display("Fail to insert value to table: {}, {}", table_name, source))] + Insert { + table_name: String, + source: TableError, + }, + // The error source of http error is clear even without backtrace now so // a backtrace is not carried in this varaint. #[snafu(display("Fail to start HTTP server, source: {}", source))] @@ -38,6 +93,14 @@ impl ErrorExt for Error { Error::ExecuteSql { source } | Error::NewCatalog { source } => source.status_code(), // TODO(yingwen): Further categorize http error. Error::StartHttp { .. } | Error::ParseAddr { .. } => StatusCode::Internal, + Error::CreateTable { source, .. } => source.status_code(), + Error::GetTable { .. } => StatusCode::Internal, + Error::TableNotFound { .. } => StatusCode::TableNotFound, + Error::ColumnNotFound { .. } => StatusCode::TableColumnNotFound, + Error::ColumnValuesNumberMismatch { .. } + | Error::ParseSqlValue { .. } + | Error::ColumnTypeMismatch { .. } => StatusCode::InvalidArguments, + Error::Insert { source, .. } => source.status_code(), } } diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index b2ee532977..6fbb4751cd 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -1,17 +1,30 @@ use std::sync::Arc; -use query::catalog::CatalogListRef; +use datatypes::prelude::ConcreteDataType; +use datatypes::schema::{ColumnSchema, Schema}; +use query::catalog::{CatalogListRef, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use query::query_engine::{Output, QueryEngineFactory, QueryEngineRef}; use snafu::ResultExt; +use sql::statements::statement::Statement; +use storage::EngineImpl; +use table::engine::EngineContext; +use table::engine::TableEngine; +use table::requests::CreateTableRequest; +use table_engine::engine::MitoEngine; -use crate::error::{ExecuteSqlSnafu, Result}; +use crate::error::{CreateTableSnafu, ExecuteSqlSnafu, Result}; +use crate::sql::SqlHandler; + +type DefaultEngine = MitoEngine; // An abstraction to read/write services. pub struct Instance { // Query service query_engine: QueryEngineRef, + table_engine: DefaultEngine, + sql_handler: SqlHandler, // Catalog list - _catalog_list: CatalogListRef, + catalog_list: CatalogListRef, } pub type InstanceRef = Arc; @@ -20,22 +33,86 @@ impl Instance { pub fn new(catalog_list: CatalogListRef) -> Self { let factory = QueryEngineFactory::new(catalog_list.clone()); let query_engine = factory.query_engine().clone(); + let table_engine = DefaultEngine::new(EngineImpl::new()); + Self { query_engine, - _catalog_list: catalog_list, + sql_handler: SqlHandler::new(table_engine.clone()), + table_engine, + catalog_list, } } pub async fn execute_sql(&self, sql: &str) -> Result { - let logical_plan = self + let stmt = self .query_engine - .sql_to_plan(sql) + .sql_to_statement(sql) .context(ExecuteSqlSnafu)?; - self.query_engine - .execute(&logical_plan) + match stmt { + Statement::Query(_) => { + let logical_plan = self + .query_engine + .statement_to_plan(stmt) + .context(ExecuteSqlSnafu)?; + + self.query_engine + .execute(&logical_plan) + .await + .context(ExecuteSqlSnafu) + } + Statement::Insert(_) => { + let schema_provider = self + .catalog_list + .catalog(DEFAULT_CATALOG_NAME) + .unwrap() + .schema(DEFAULT_SCHEMA_NAME) + .unwrap(); + + let request = self + .sql_handler + .statement_to_request(schema_provider, stmt)?; + self.sql_handler.execute(request).await + } + _ => unimplemented!(), + } + } + + pub async fn start(&self) -> Result<()> { + // FIXME(boyan): create a demo table for test + let column_schemas = vec![ + ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), + ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true), + ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true), + ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), true), + ]; + + let table_name = "demo"; + let table = self + .table_engine + .create_table( + &EngineContext::default(), + CreateTableRequest { + name: table_name.to_string(), + desc: Some(" a test table".to_string()), + schema: Arc::new(Schema::new(column_schemas)), + }, + ) .await - .context(ExecuteSqlSnafu) + .context(CreateTableSnafu { table_name })?; + + let schema_provider = self + .catalog_list + .catalog(DEFAULT_CATALOG_NAME) + .unwrap() + .schema(DEFAULT_SCHEMA_NAME) + .unwrap(); + + schema_provider + .register_table(table_name.to_string(), table) + .unwrap(); + + Ok(()) } } @@ -48,7 +125,27 @@ mod tests { use super::*; #[tokio::test] - async fn test_execute_sql() { + async fn test_execute_insert() { + let catalog_list = memory::new_memory_catalog_list().unwrap(); + + let instance = Instance::new(catalog_list); + instance.start().await.unwrap(); + + let output = instance + .execute_sql( + r#"insert into demo(host, cpu, memory, ts) values + ('host1', 66.6, 1024, 1655276557000), + ('host2', 88.8, 333.3, 1655276558000) + "#, + ) + .await + .unwrap(); + + assert!(matches!(output, Output::AffectedRows(2))); + } + + #[tokio::test] + async fn test_execute_query() { let catalog_list = memory::new_memory_catalog_list().unwrap(); let instance = Instance::new(catalog_list); diff --git a/src/datanode/src/lib.rs b/src/datanode/src/lib.rs index c466613256..8868477085 100644 --- a/src/datanode/src/lib.rs +++ b/src/datanode/src/lib.rs @@ -4,6 +4,7 @@ pub mod error; pub mod instance; mod metric; pub mod server; +mod sql; pub use crate::datanode::Datanode; pub use crate::datanode::DatanodeOptions; diff --git a/src/datanode/src/sql.rs b/src/datanode/src/sql.rs new file mode 100644 index 0000000000..59f317c474 --- /dev/null +++ b/src/datanode/src/sql.rs @@ -0,0 +1,179 @@ +//! sql handler + +mod insert; +use query::catalog::schema::SchemaProviderRef; +use query::query_engine::Output; +use snafu::{OptionExt, ResultExt}; +use sql::statements::statement::Statement; +use table::engine::{EngineContext, TableEngine}; +use table::requests::*; +use table::TableRef; + +use crate::error::{GetTableSnafu, Result, TableNotFoundSnafu}; + +pub enum SqlRequest { + Insert(InsertRequest), +} + +// Handler to execute SQL except query +pub struct SqlHandler { + table_engine: Engine, +} + +impl SqlHandler { + pub fn new(table_engine: Engine) -> Self { + Self { table_engine } + } + + pub async fn execute(&self, request: SqlRequest) -> Result { + match request { + SqlRequest::Insert(req) => self.insert(req).await, + } + } + + pub(crate) fn get_table(&self, table_name: &str) -> Result { + self.table_engine + .get_table(&EngineContext::default(), table_name) + .map_err(|e| Box::new(e) as _) + .context(GetTableSnafu { table_name })? + .context(TableNotFoundSnafu { table_name }) + } + + // Cast sql statement into sql request + pub(crate) fn statement_to_request( + &self, + schema_provider: SchemaProviderRef, + statement: Statement, + ) -> Result { + match statement { + Statement::Insert(stmt) => self.insert_to_request(schema_provider, *stmt), + _ => unimplemented!(), + } + } +} + +#[cfg(test)] +mod tests { + use std::any::Any; + use std::sync::Arc; + + use common_query::logical_plan::Expr; + use common_recordbatch::SendableRecordBatchStream; + use datatypes::prelude::ConcreteDataType; + use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; + use datatypes::value::Value; + use query::catalog::memory; + use query::catalog::schema::SchemaProvider; + use query::error::Result as QueryResult; + use query::QueryEngineFactory; + use storage::EngineImpl; + use table::error::Result as TableResult; + use table::{Table, TableRef}; + use table_engine::engine::MitoEngine; + + use super::*; + + struct DemoTable; + + #[async_trait::async_trait] + impl Table for DemoTable { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + let column_schemas = vec![ + ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), + ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true), + ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true), + ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), true), + ]; + + Arc::new(Schema::new(column_schemas)) + } + async fn scan( + &self, + _projection: &Option>, + _filters: &[Expr], + _limit: Option, + ) -> TableResult { + unimplemented!(); + } + } + + struct MockSchemaProvider; + + impl SchemaProvider for MockSchemaProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn table_names(&self) -> Vec { + vec!["demo".to_string()] + } + + fn table(&self, name: &str) -> Option { + assert_eq!(name, "demo"); + Some(Arc::new(DemoTable {})) + } + + fn register_table(&self, _name: String, _table: TableRef) -> QueryResult> { + unimplemented!(); + } + fn deregister_table(&self, _name: &str) -> QueryResult> { + unimplemented!(); + } + fn table_exist(&self, name: &str) -> bool { + name == "demo" + } + } + + #[test] + fn test_statement_to_request() { + let catalog_list = memory::new_memory_catalog_list().unwrap(); + let factory = QueryEngineFactory::new(catalog_list); + let query_engine = factory.query_engine().clone(); + + let sql = r#"insert into demo(host, cpu, memory, ts) values + ('host1', 66.6, 1024, 1655276557000), + ('host2', 88.8, 333.3, 1655276558000) + "#; + + let table_engine = MitoEngine::::new(EngineImpl::new()); + let sql_handler = SqlHandler::new(table_engine); + + let stmt = query_engine.sql_to_statement(sql).unwrap(); + let schema_provider = Arc::new(MockSchemaProvider {}); + let request = sql_handler + .statement_to_request(schema_provider, stmt) + .unwrap(); + + match request { + SqlRequest::Insert(req) => { + assert_eq!(req.table_name, "demo"); + let columns_values = req.columns_values; + assert_eq!(4, columns_values.len()); + + let hosts = &columns_values["host"]; + assert_eq!(2, hosts.len()); + assert_eq!(Value::from("host1"), hosts.get(0)); + assert_eq!(Value::from("host2"), hosts.get(1)); + + let cpus = &columns_values["cpu"]; + assert_eq!(2, cpus.len()); + assert_eq!(Value::from(66.6f64), cpus.get(0)); + assert_eq!(Value::from(88.8f64), cpus.get(1)); + + let memories = &columns_values["memory"]; + assert_eq!(2, memories.len()); + assert_eq!(Value::from(1024f64), memories.get(0)); + assert_eq!(Value::from(333.3f64), memories.get(1)); + + let ts = &columns_values["ts"]; + assert_eq!(2, ts.len()); + assert_eq!(Value::from(1655276557000i64), ts.get(0)); + assert_eq!(Value::from(1655276558000i64), ts.get(1)); + } + } + } +} diff --git a/src/datanode/src/sql/insert.rs b/src/datanode/src/sql/insert.rs new file mode 100644 index 0000000000..4294c488f9 --- /dev/null +++ b/src/datanode/src/sql/insert.rs @@ -0,0 +1,263 @@ +use std::str::FromStr; + +use datatypes::prelude::ConcreteDataType; +use datatypes::prelude::VectorBuilder; +use datatypes::value::Value; +use query::catalog::schema::SchemaProviderRef; +use query::query_engine::Output; +use snafu::ensure; +use snafu::OptionExt; +use snafu::ResultExt; +use sql::ast::Value as SqlValue; +use sql::statements::insert::Insert; +use table::engine::TableEngine; +use table::requests::*; + +use crate::error::{ + ColumnNotFoundSnafu, ColumnTypeMismatchSnafu, ColumnValuesNumberMismatchSnafu, InsertSnafu, + ParseSqlValueSnafu, Result, TableNotFoundSnafu, +}; +use crate::sql::{SqlHandler, SqlRequest}; + +impl SqlHandler { + pub(crate) async fn insert(&self, req: InsertRequest) -> Result { + let table_name = &req.table_name.to_string(); + let table = self.get_table(table_name)?; + + let affected_rows = table + .insert(req) + .await + .context(InsertSnafu { table_name })?; + + Ok(Output::AffectedRows(affected_rows)) + } + + pub(crate) fn insert_to_request( + &self, + schema_provider: SchemaProviderRef, + stmt: Insert, + ) -> Result { + let columns = stmt.columns(); + let values = stmt.values(); + //TODO(dennis): table name may be in the form of `catalog.schema.table`, + // but we don't process it right now. + let table_name = stmt.table_name(); + + let table = schema_provider + .table(&table_name) + .context(TableNotFoundSnafu { + table_name: &table_name, + })?; + let schema = table.schema(); + let columns_num = if columns.is_empty() { + schema.column_schemas().len() + } else { + columns.len() + }; + let rows_num = values.len(); + + let mut columns_builders: Vec<(&String, &ConcreteDataType, VectorBuilder)> = + Vec::with_capacity(columns_num); + + if columns.is_empty() { + for column_schema in schema.column_schemas() { + let data_type = &column_schema.data_type; + columns_builders.push(( + &column_schema.name, + data_type, + VectorBuilder::with_capacity(data_type.clone(), rows_num), + )); + } + } else { + for column_name in columns { + let column_schema = + schema.column_schema_by_name(column_name).with_context(|| { + ColumnNotFoundSnafu { + table_name: &table_name, + column_name: column_name.to_string(), + } + })?; + let data_type = &column_schema.data_type; + columns_builders.push(( + column_name, + data_type, + VectorBuilder::with_capacity(data_type.clone(), rows_num), + )); + } + } + + // Convert rows into columns + for row in values { + ensure!( + row.len() == columns_num, + ColumnValuesNumberMismatchSnafu { + columns: columns_num, + values: row.len(), + } + ); + + for (sql_val, (column_name, data_type, builder)) in + row.iter().zip(columns_builders.iter_mut()) + { + add_row_to_vector(column_name, data_type, sql_val, builder)?; + } + } + + Ok(SqlRequest::Insert(InsertRequest { + table_name, + columns_values: columns_builders + .into_iter() + .map(|(c, _, mut b)| (c.to_owned(), b.finish())) + .collect(), + })) + } +} + +fn add_row_to_vector( + column_name: &str, + data_type: &ConcreteDataType, + sql_val: &SqlValue, + builder: &mut VectorBuilder, +) -> Result<()> { + let value = parse_sql_value(column_name, data_type, sql_val)?; + builder.push(&value); + + Ok(()) +} + +fn parse_sql_value( + column_name: &str, + data_type: &ConcreteDataType, + sql_val: &SqlValue, +) -> Result { + Ok(match sql_val { + SqlValue::Number(n, _) => sql_number_to_value(data_type, n)?, + SqlValue::Null => Value::Null, + SqlValue::Boolean(b) => { + ensure!( + data_type.is_boolean(), + ColumnTypeMismatchSnafu { + column_name, + expect: data_type.clone(), + actual: ConcreteDataType::boolean_datatype(), + } + ); + + (*b).into() + } + SqlValue::DoubleQuotedString(s) | SqlValue::SingleQuotedString(s) => { + ensure!( + data_type.is_string(), + ColumnTypeMismatchSnafu { + column_name, + expect: data_type.clone(), + actual: ConcreteDataType::string_datatype(), + } + ); + + s.to_owned().into() + } + + _ => todo!("Other sql value"), + }) +} + +macro_rules! parse_number_to_value { + ($data_type: expr, $n: ident, $(($Type: ident, $PrimitiveType: ident)), +) => { + match $data_type { + $( + ConcreteDataType::$Type(_) => { + let n = parse_sql_number::<$PrimitiveType>($n)?; + Ok(Value::from(n)) + }, + )+ + _ => ParseSqlValueSnafu { + msg: format!("Fail to parse number {}, invalid column type: {:?}", + $n, $data_type + )}.fail(), + } + } +} + +fn sql_number_to_value(data_type: &ConcreteDataType, n: &str) -> Result { + parse_number_to_value!( + data_type, + n, + (UInt8, u8), + (UInt16, u16), + (UInt32, u32), + (UInt64, u64), + (Int8, i8), + (Int16, i16), + (Int32, i32), + (Int64, i64), + (Float64, f64), + (Float32, f32) + ) +} + +fn parse_sql_number(n: &str) -> Result +where + ::Err: std::fmt::Debug, +{ + match n.parse::() { + Ok(n) => Ok(n), + Err(e) => ParseSqlValueSnafu { + msg: format!("Fail to parse number {}, {:?}", n, e), + } + .fail(), + } +} + +#[cfg(test)] +mod tests { + use datatypes::value::OrderedFloat; + + use super::*; + + #[test] + fn test_sql_number_to_value() { + let v = sql_number_to_value(&ConcreteDataType::float64_datatype(), "3.0").unwrap(); + assert_eq!(Value::Float64(OrderedFloat(3.0)), v); + + let v = sql_number_to_value(&ConcreteDataType::int32_datatype(), "999").unwrap(); + assert_eq!(Value::Int32(999), v); + + let v = sql_number_to_value(&ConcreteDataType::string_datatype(), "999"); + assert!(v.is_err(), "parse value error is: {:?}", v); + } + + #[test] + fn test_parse_sql_value() { + let sql_val = SqlValue::Null; + assert_eq!( + Value::Null, + parse_sql_value("a", &ConcreteDataType::float64_datatype(), &sql_val).unwrap() + ); + + let sql_val = SqlValue::Boolean(true); + assert_eq!( + Value::Boolean(true), + parse_sql_value("a", &ConcreteDataType::boolean_datatype(), &sql_val).unwrap() + ); + + let sql_val = SqlValue::Number("3.0".to_string(), false); + assert_eq!( + Value::Float64(OrderedFloat(3.0)), + parse_sql_value("a", &ConcreteDataType::float64_datatype(), &sql_val).unwrap() + ); + + let sql_val = SqlValue::Number("3.0".to_string(), false); + let v = parse_sql_value("a", &ConcreteDataType::boolean_datatype(), &sql_val); + assert!(v.is_err()); + assert!(format!("{:?}", v) + .contains("Fail to parse number 3.0, invalid column type: Boolean(BooleanType)")); + + let sql_val = SqlValue::Boolean(true); + let v = parse_sql_value("a", &ConcreteDataType::float64_datatype(), &sql_val); + assert!(v.is_err()); + assert!(format!("{:?}", v).contains( + "column_name: \"a\", expect: Float64(Float64), actual: Boolean(BooleanType)" + )); + } +} diff --git a/src/datatypes/src/data_type.rs b/src/datatypes/src/data_type.rs index 2f2277ca82..c814d70448 100644 --- a/src/datatypes/src/data_type.rs +++ b/src/datatypes/src/data_type.rs @@ -42,6 +42,14 @@ impl ConcreteDataType { ) } + pub fn is_boolean(&self) -> bool { + matches!(self, ConcreteDataType::Boolean(_)) + } + + pub fn is_string(&self) -> bool { + matches!(self, ConcreteDataType::String(_)) + } + pub fn is_signed(&self) -> bool { matches!( self, diff --git a/src/datatypes/src/prelude.rs b/src/datatypes/src/prelude.rs index fc460cd473..527ea967ef 100644 --- a/src/datatypes/src/prelude.rs +++ b/src/datatypes/src/prelude.rs @@ -3,4 +3,6 @@ pub use crate::macros::*; pub use crate::scalars::{Scalar, ScalarRef, ScalarVector, ScalarVectorBuilder}; pub use crate::type_id::LogicalTypeId; pub use crate::value::Value; -pub use crate::vectors::{Helper as VectorHelper, MutableVector, Validity, Vector, VectorRef}; +pub use crate::vectors::{ + Helper as VectorHelper, MutableVector, Validity, Vector, VectorBuilder, VectorRef, +}; diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index 52a2abe8da..74fc69dbdd 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -13,6 +13,7 @@ use common_query::prelude::ScalarUdf; use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream}; use common_telemetry::timer; use snafu::{OptionExt, ResultExt}; +use sql::statements::statement::Statement; use sql::{dialect::GenericDialect, parser::ParserContext}; pub use crate::datafusion::catalog_adapter::DfCatalogListAdapter; @@ -50,15 +51,25 @@ impl QueryEngine for DatafusionQueryEngine { "datafusion" } - fn sql_to_plan(&self, sql: &str) -> Result { - let _timer = timer!(metric::METRIC_PARSE_SQL_ELAPSED); - let context_provider = DfContextProviderAdapter::new(self.state.clone()); - let planner = DfPlanner::new(&context_provider); + fn sql_to_statement(&self, sql: &str) -> Result { let mut statement = ParserContext::create_with_dialect(sql, &GenericDialect {}) .context(error::ParseSqlSnafu)?; // TODO(dennis): supports multi statement in one sql? assert!(1 == statement.len()); - planner.statement_to_plan(statement.remove(0)) + Ok(statement.remove(0)) + } + + fn statement_to_plan(&self, stmt: Statement) -> Result { + let context_provider = DfContextProviderAdapter::new(self.state.clone()); + let planner = DfPlanner::new(&context_provider); + + planner.statement_to_plan(stmt) + } + + fn sql_to_plan(&self, sql: &str) -> Result { + let _timer = timer!(metric::METRIC_PARSE_SQL_ELAPSED); + let stmt = self.sql_to_statement(sql)?; + self.statement_to_plan(stmt) } async fn execute(&self, plan: &LogicalPlan) -> Result { diff --git a/src/query/src/datafusion/planner.rs b/src/query/src/datafusion/planner.rs index 615247bf3d..68cf31628e 100644 --- a/src/query/src/datafusion/planner.rs +++ b/src/query/src/datafusion/planner.rs @@ -48,9 +48,7 @@ where todo!("Currently not supported") } Statement::Query(qb) => self.query_to_plan(qb), - Statement::Insert(_) => { - todo!() - } + Statement::Insert(_) => unreachable!(), } } } diff --git a/src/query/src/query_engine.rs b/src/query/src/query_engine.rs index e79a76618c..c9595ccfc3 100644 --- a/src/query/src/query_engine.rs +++ b/src/query/src/query_engine.rs @@ -6,6 +6,7 @@ use std::sync::Arc; use common_function::scalars::{FunctionRef, FUNCTION_REGISTRY}; use common_query::prelude::ScalarUdf; use common_recordbatch::SendableRecordBatchStream; +use sql::statements::statement::Statement; use crate::catalog::CatalogList; use crate::datafusion::DatafusionQueryEngine; @@ -24,6 +25,10 @@ pub enum Output { pub trait QueryEngine: Send + Sync { fn name(&self) -> &str; + fn sql_to_statement(&self, sql: &str) -> Result; + + fn statement_to_plan(&self, stmt: Statement) -> Result; + fn sql_to_plan(&self, sql: &str) -> Result; async fn execute(&self, plan: &LogicalPlan) -> Result; diff --git a/src/sql/src/ast.rs b/src/sql/src/ast.rs index 8b13789179..317527540b 100644 --- a/src/sql/src/ast.rs +++ b/src/sql/src/ast.rs @@ -1 +1,2 @@ - +pub use sqlparser::ast::Expr; +pub use sqlparser::ast::Value; diff --git a/src/sql/src/statements/insert.rs b/src/sql/src/statements/insert.rs index 875ccfc927..6dca41beaf 100644 --- a/src/sql/src/statements/insert.rs +++ b/src/sql/src/statements/insert.rs @@ -1,12 +1,56 @@ -use sqlparser::ast::Statement; +use sqlparser::ast::{SetExpr, Statement, Values}; use sqlparser::parser::ParserError; +use crate::ast::{Expr, Value}; + #[derive(Debug, Clone, PartialEq)] pub struct Insert { // Can only be sqlparser::ast::Statement::Insert variant pub inner: Statement, } +impl Insert { + pub fn table_name(&self) -> String { + match &self.inner { + Statement::Insert { table_name, .. } => { + // FIXME(dennis): table_name may be in the form of "catalog.schema.table" + table_name.to_string() + } + _ => unreachable!(), + } + } + + pub fn columns(&self) -> Vec<&String> { + match &self.inner { + Statement::Insert { columns, .. } => columns.iter().map(|ident| &ident.value).collect(), + _ => unreachable!(), + } + } + + pub fn values(&self) -> Vec> { + match &self.inner { + Statement::Insert { source, .. } => match &source.body { + SetExpr::Values(Values(values)) => values + .iter() + .map(|v| { + v.iter() + .map(|expr| match expr { + Expr::Value(v) => v.clone(), + Expr::Identifier(ident) => { + Value::SingleQuotedString(ident.value.clone()) + } + _ => unreachable!(), + }) + .collect::>() + }) + .collect(), + _ => unreachable!(), + }, + _ => unreachable!(), + } + } +} + impl TryFrom for Insert { type Error = ParserError; diff --git a/src/storage/src/test_util/descriptor_util.rs b/src/storage/src/test_util/descriptor_util.rs index bfe642f874..81da5cb52d 100644 --- a/src/storage/src/test_util/descriptor_util.rs +++ b/src/storage/src/test_util/descriptor_util.rs @@ -57,6 +57,7 @@ impl RegionDescBuilder { pub fn build(self) -> RegionDescriptor { RegionDescriptor { + id: 0, name: self.name, row_key: self.key_builder.build(), default_cf: self.default_cf_builder.build(), diff --git a/src/store-api/src/storage.rs b/src/store-api/src/storage.rs index 776894b76b..29b4d47925 100644 --- a/src/store-api/src/storage.rs +++ b/src/store-api/src/storage.rs @@ -14,9 +14,9 @@ pub use datatypes::data_type::ConcreteDataType; pub use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; pub use self::descriptors::{ - ColumnDescriptor, ColumnDescriptorBuilder, ColumnFamilyDescriptor, - ColumnFamilyDescriptorBuilder, ColumnFamilyId, ColumnId, RegionDescriptor, RowKeyDescriptor, - RowKeyDescriptorBuilder, + gen_region_name, ColumnDescriptor, ColumnDescriptorBuilder, ColumnFamilyDescriptor, + ColumnFamilyDescriptorBuilder, ColumnFamilyId, ColumnId, RegionDescriptor, RegionId, + RowKeyDescriptor, RowKeyDescriptorBuilder, }; pub use self::engine::{EngineContext, StorageEngine}; pub use self::metadata::RegionMeta; diff --git a/src/store-api/src/storage/descriptors.rs b/src/store-api/src/storage/descriptors.rs index b9d0ecb2b8..5295a457ca 100644 --- a/src/store-api/src/storage/descriptors.rs +++ b/src/store-api/src/storage/descriptors.rs @@ -6,6 +6,14 @@ use crate::storage::{consts, ColumnSchema, ConcreteDataType}; pub type ColumnId = u32; /// Id of column family, unique in each region. pub type ColumnFamilyId = u32; +pub type RegionId = u32; +/// Default region name prefix +pub const REGION_PREFIX: &str = "r_"; + +#[inline] +pub fn gen_region_name(id: RegionId) -> String { + format!("{}{}", REGION_PREFIX, id) +} // TODO(yingwen): Validate default value has same type with column, and name is a valid column name. /// A [ColumnDescriptor] contains information to create a column. @@ -52,6 +60,7 @@ pub struct ColumnFamilyDescriptor { /// A [RegionDescriptor] contains information to create a region. #[derive(Debug, Clone, PartialEq)] pub struct RegionDescriptor { + pub id: RegionId, /// Region name. pub name: String, /// Row key descriptor of this region. @@ -286,4 +295,10 @@ mod tests { .build(); assert_eq!(1, desc.columns.len()); } + + #[test] + fn test_gen_region_name() { + assert_eq!("r_0", gen_region_name(0)); + assert_eq!("r_99", gen_region_name(99)); + } } diff --git a/src/store-api/src/storage/engine.rs b/src/store-api/src/storage/engine.rs index a74dd39e42..90fab46b01 100644 --- a/src/store-api/src/storage/engine.rs +++ b/src/store-api/src/storage/engine.rs @@ -12,7 +12,7 @@ use crate::storage::region::Region; /// Storage engine provides primitive operations to store and access data. #[async_trait] -pub trait StorageEngine: Send + Sync + Clone { +pub trait StorageEngine: Send + Sync + Clone + 'static { type Error: ErrorExt + Send + Sync; type Region: Region; diff --git a/src/store-api/src/storage/region.rs b/src/store-api/src/storage/region.rs index f6d251f177..8a43dcbb83 100644 --- a/src/store-api/src/storage/region.rs +++ b/src/store-api/src/storage/region.rs @@ -28,7 +28,7 @@ use crate::storage::snapshot::{ReadContext, Snapshot}; /// Chunks of rows in storage engine. #[async_trait] -pub trait Region: Send + Sync + Clone { +pub trait Region: Send + Sync + Clone + 'static { type Error: ErrorExt + Send + Sync; type Meta: RegionMeta; type WriteRequest: WriteRequest; @@ -52,5 +52,5 @@ pub trait Region: Send + Sync + Clone { } /// Context for write operations. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct WriteContext {} diff --git a/src/store-api/src/storage/requests.rs b/src/store-api/src/storage/requests.rs index b83b2d57f4..bb41344664 100644 --- a/src/store-api/src/storage/requests.rs +++ b/src/store-api/src/storage/requests.rs @@ -13,7 +13,7 @@ pub trait WriteRequest: Send { } /// Put multiple rows. -pub trait PutOperation { +pub trait PutOperation: Send { type Error: ErrorExt + Send + Sync; fn new() -> Self; diff --git a/src/table-engine/Cargo.toml b/src/table-engine/Cargo.toml new file mode 100644 index 0000000000..ac2ea093af --- /dev/null +++ b/src/table-engine/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "table-engine" +version = "0.1.0" +edition = "2021" + +[dependencies] +async-trait = "0.1" +chrono = { version = "0.4", features = ["serde"] } +common-error = {path = "../common/error" } +common-query = {path = "../common/query" } +common-recordbatch = {path = "../common/recordbatch" } +common-telemetry = {path = "../common/telemetry" } +snafu = { version = "0.7", features = ["backtraces"] } +storage ={ path = "../storage" } +store-api ={ path = "../store-api" } +table = { path = "../table" } + +[dev-dependencies] +datatypes = { path = "../datatypes" } +tokio = { version = "1.18", features = ["full"] } \ No newline at end of file diff --git a/src/table-engine/src/engine.rs b/src/table-engine/src/engine.rs new file mode 100644 index 0000000000..45b0b2e6a3 --- /dev/null +++ b/src/table-engine/src/engine.rs @@ -0,0 +1,251 @@ +use std::collections::HashMap; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; +use std::sync::RwLock; + +use async_trait::async_trait; +use snafu::ResultExt; +use store_api::storage::ConcreteDataType; +use store_api::storage::{ + self as store, ColumnDescriptorBuilder, ColumnFamilyDescriptorBuilder, + EngineContext as StorageContext, Region, RegionDescriptor, RegionId, RegionMeta, + RowKeyDescriptorBuilder, StorageEngine, +}; +use table::engine::{EngineContext, TableEngine}; +use table::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest}; +use table::{ + metadata::{TableId, TableInfoBuilder, TableMetaBuilder, TableType}, + table::TableRef, +}; + +use crate::error::{CreateTableSnafu, Error, Result}; +use crate::table::MitoTable; + +pub const DEFAULT_ENGINE: &str = "mito"; + +/// [TableEngine] implementation. +/// +/// About mito . +/// "you can't be a true petrolhead until you've owned an Alfa Romeo" -- by Jeremy Clarkson +#[derive(Clone)] +pub struct MitoEngine { + inner: Arc>, +} + +impl MitoEngine { + pub fn new(storage_engine: Store) -> Self { + Self { + inner: Arc::new(MitoEngineInner::new(storage_engine)), + } + } +} + +#[async_trait] +impl TableEngine for MitoEngine { + type Error = Error; + + async fn create_table( + &self, + ctx: &EngineContext, + request: CreateTableRequest, + ) -> Result { + self.inner.create_table(ctx, request).await + } + + async fn alter_table( + &self, + _ctx: &EngineContext, + _request: AlterTableRequest, + ) -> Result { + unimplemented!(); + } + + fn get_table(&self, ctx: &EngineContext, name: &str) -> Result> { + self.inner.get_table(ctx, name) + } + + fn table_exists(&self, _ctx: &EngineContext, _name: &str) -> bool { + unimplemented!(); + } + + async fn drop_table(&self, _ctx: &EngineContext, _request: DropTableRequest) -> Result<()> { + unimplemented!(); + } +} + +/// FIXME(boyan) impl system catalog to keep table metadata. +struct MitoEngineInner { + tables: RwLock>, + storage_engine: Store, + next_table_id: AtomicU64, +} + +impl MitoEngineInner { + fn new(storage_engine: Store) -> Self { + Self { + tables: RwLock::new(HashMap::default()), + storage_engine, + next_table_id: AtomicU64::new(0), + } + } + + fn next_table_id(&self) -> TableId { + self.next_table_id.fetch_add(1, Ordering::Relaxed) + } +} + +impl MitoEngineInner { + async fn create_table( + &self, + _ctx: &EngineContext, + request: CreateTableRequest, + ) -> Result { + //FIXME(boyan): we only supports creating a demo table right now + //The create table sql is like: + // create table demo(host string, + // ts int64, + // cpu float64, + // memory float64, + // PRIMARY KEY(ts, host)) with regions=1; + + //TODO(boyan): supports multi regions + let region_id: RegionId = 0; + let name = store::gen_region_name(region_id); + + let host_column = + ColumnDescriptorBuilder::new(0, "host", ConcreteDataType::string_datatype()) + .is_nullable(false) + .build(); + let cpu_column = + ColumnDescriptorBuilder::new(1, "cpu", ConcreteDataType::float64_datatype()) + .is_nullable(true) + .build(); + let memory_column = + ColumnDescriptorBuilder::new(2, "memory", ConcreteDataType::float64_datatype()) + .is_nullable(true) + .build(); + let ts_column = + ColumnDescriptorBuilder::new(0, "ts", ConcreteDataType::int64_datatype()).build(); + + let row_key = RowKeyDescriptorBuilder::new(ts_column) + .push_column(host_column) + .enable_version_column(false) + .build(); + + let default_cf = ColumnFamilyDescriptorBuilder::default() + .push_column(cpu_column) + .push_column(memory_column) + .build(); + + let region = self + .storage_engine + .create_region( + &StorageContext::default(), + RegionDescriptor { + id: region_id, + name, + row_key, + default_cf, + extra_cfs: Vec::default(), + }, + ) + .await + .map_err(|e| Box::new(e) as _) + .context(CreateTableSnafu)?; + + // Use region meta schema instead of request schema + let table_meta = TableMetaBuilder::new(region.in_memory_metadata().schema().clone()) + .engine(DEFAULT_ENGINE) + .build(); + + let table_name = request.name; + let table_info = TableInfoBuilder::new(table_name.clone(), table_meta) + .table_id(self.next_table_id()) + .table_version(0u64) + .table_type(TableType::Base) + .desc(request.desc) + .build(); + + let table = Arc::new(MitoTable::new(table_info, region)); + + self.tables + .write() + .unwrap() + .insert(table_name, table.clone()); + + Ok(table) + } + + fn get_table(&self, _ctx: &EngineContext, name: &str) -> Result> { + Ok(self.tables.read().unwrap().get(name).cloned()) + } +} + +#[cfg(test)] +mod tests { + use datatypes::schema::{ColumnSchema, Schema}; + use datatypes::vectors::*; + use storage::EngineImpl; + use table::requests::InsertRequest; + + use super::*; + + #[tokio::test] + async fn test_creat_table_insert() { + let column_schemas = vec![ + ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), + ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), true), + ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true), + ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true), + ]; + + let table_engine = MitoEngine::::new(EngineImpl::new()); + + let table_name = "demo"; + let schema = Arc::new(Schema::new(column_schemas)); + let table = table_engine + .create_table( + &EngineContext::default(), + CreateTableRequest { + name: table_name.to_string(), + desc: Some(" a test table".to_string()), + schema: schema.clone(), + }, + ) + .await + .unwrap(); + + assert_eq!(TableType::Base, table.table_type()); + assert_eq!(schema, table.schema()); + + let insert_req = InsertRequest { + table_name: table_name.to_string(), + columns_values: HashMap::default(), + }; + assert_eq!(0, table.insert(insert_req).await.unwrap()); + + let mut columns_values: HashMap = HashMap::with_capacity(4); + columns_values.insert( + "host".to_string(), + Arc::new(StringVector::from(vec!["host1", "host2"])), + ); + columns_values.insert( + "cpu".to_string(), + Arc::new(Float64Vector::from_vec(vec![55.5, 66.6])), + ); + columns_values.insert( + "memory".to_string(), + Arc::new(Float64Vector::from_vec(vec![1024f64, 4096f64])), + ); + columns_values.insert( + "ts".to_string(), + Arc::new(Int64Vector::from_vec(vec![1, 2])), + ); + + let insert_req = InsertRequest { + table_name: table_name.to_string(), + columns_values, + }; + assert_eq!(2, table.insert(insert_req).await.unwrap()); + } +} diff --git a/src/table-engine/src/error.rs b/src/table-engine/src/error.rs new file mode 100644 index 0000000000..26b041be5c --- /dev/null +++ b/src/table-engine/src/error.rs @@ -0,0 +1,35 @@ +use std::any::Any; + +use common_error::prelude::*; + +// TODO(boyan): use ErrorExt instead. +pub type BoxedError = Box; + +#[derive(Debug, Snafu)] +#[snafu(visibility(pub))] +pub enum Error { + #[snafu(display("Fail to create table, source: {}", source))] + CreateTable { + source: BoxedError, + backtrace: Backtrace, + }, +} + +pub type Result = std::result::Result; + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + match self { + //TODO: should return the source's status code after use ErrorExt in BoxedError. + Error::CreateTable { .. } => StatusCode::InvalidArguments, + } + } + + fn backtrace_opt(&self) -> Option<&Backtrace> { + ErrorCompat::backtrace(self) + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/table-engine/src/lib.rs b/src/table-engine/src/lib.rs new file mode 100644 index 0000000000..bf9f81629f --- /dev/null +++ b/src/table-engine/src/lib.rs @@ -0,0 +1,3 @@ +pub mod engine; +pub mod error; +pub mod table; diff --git a/src/table-engine/src/table.rs b/src/table-engine/src/table.rs new file mode 100644 index 0000000000..493fb82b9a --- /dev/null +++ b/src/table-engine/src/table.rs @@ -0,0 +1,94 @@ +use std::any::Any; + +use async_trait::async_trait; +use common_query::logical_plan::Expr; +use common_recordbatch::SendableRecordBatchStream; +use snafu::OptionExt; +use store_api::storage::SchemaRef; +use store_api::storage::{PutOperation, Region, WriteContext, WriteRequest}; +use table::error::{Error as TableError, MissingColumnSnafu, Result as TableResult}; +use table::requests::InsertRequest; +use table::{ + metadata::{TableInfo, TableType}, + table::Table, +}; + +/// [Table] implementation. +pub struct MitoTable { + table_info: TableInfo, + //TODO(dennis): a table contains multi regions + region: R, +} + +#[async_trait] +impl Table for MitoTable { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.table_info.meta.schema.clone() + } + + async fn insert(&self, request: InsertRequest) -> TableResult { + if request.columns_values.is_empty() { + return Ok(0); + } + + let mut write_request = R::WriteRequest::new(self.schema()); + + //FIXME(boyan): we can only insert to demo table right now + let mut put_op = <::WriteRequest as WriteRequest>::PutOp::new(); + let mut columns_values = request.columns_values; + let key_columns = vec!["ts", "host"]; + let value_columns = vec!["cpu", "memory"]; + //Add row key and columns + for name in key_columns { + put_op + .add_key_column( + name, + columns_values + .get(name) + .context(MissingColumnSnafu { name })? + .clone(), + ) + .map_err(TableError::new)?; + } + // Add vaue columns + let mut rows_num = 0; + for name in value_columns { + if let Some(v) = columns_values.remove(name) { + rows_num = v.len(); + put_op.add_value_column(name, v).map_err(TableError::new)?; + } + } + write_request.put(put_op).map_err(TableError::new)?; + + let _resp = self + .region + .write(&WriteContext::default(), write_request) + .await + .map_err(TableError::new)?; + + Ok(rows_num) + } + + fn table_type(&self) -> TableType { + self.table_info.table_type + } + + async fn scan( + &self, + _projection: &Option>, + _filters: &[Expr], + _limit: Option, + ) -> TableResult { + unimplemented!(); + } +} + +impl MitoTable { + pub fn new(table_info: TableInfo, region: R) -> Self { + Self { table_info, region } + } +} diff --git a/src/table/src/engine.rs b/src/table/src/engine.rs index 8b2309192c..e805654b93 100644 --- a/src/table/src/engine.rs +++ b/src/table/src/engine.rs @@ -1,3 +1,45 @@ +use common_error::ext::ErrorExt; + +use crate::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest}; +use crate::TableRef; + /// Table engine abstraction. #[async_trait::async_trait] -pub trait Engine {} +pub trait TableEngine: Send + Sync + Clone { + type Error: ErrorExt + Send + Sync + 'static; + + /// Create a table by given request. + /// + /// Return the created table. + async fn create_table( + &self, + ctx: &EngineContext, + request: CreateTableRequest, + ) -> Result; + + /// Alter table schema, options etc. by given request, + /// + /// Returns the table after altered. + async fn alter_table( + &self, + ctx: &EngineContext, + request: AlterTableRequest, + ) -> Result; + + /// Returns the table by it's name. + fn get_table(&self, ctx: &EngineContext, name: &str) -> Result, Self::Error>; + + /// Returns true when the given table is exists. + fn table_exists(&self, ctx: &EngineContext, name: &str) -> bool; + + /// Drops the given table. + async fn drop_table( + &self, + ctx: &EngineContext, + request: DropTableRequest, + ) -> Result<(), Self::Error>; +} + +/// Storage engine context. +#[derive(Debug, Clone, Default)] +pub struct EngineContext {} diff --git a/src/table/src/error.rs b/src/table/src/error.rs index 1e9a0f4903..c98d714b7c 100644 --- a/src/table/src/error.rs +++ b/src/table/src/error.rs @@ -23,6 +23,9 @@ pub enum InnerError { backtrace: Backtrace, }, + #[snafu(display("Missing column when insert, column : {}", name))] + MissingColumn { name: String, backtrace: Backtrace }, + #[snafu(display("Not expected to run ExecutionPlan more than once"))] ExecuteRepeatedly { backtrace: Backtrace }, } diff --git a/src/table/src/lib.rs b/src/table/src/lib.rs index b36cf823c3..c71d471628 100644 --- a/src/table/src/lib.rs +++ b/src/table/src/lib.rs @@ -1,5 +1,7 @@ -mod engine; +pub mod engine; pub mod error; +pub mod metadata; +pub mod requests; pub mod table; pub use crate::table::{Table, TableRef}; diff --git a/src/table/src/metadata.rs b/src/table/src/metadata.rs new file mode 100644 index 0000000000..c095a43ea1 --- /dev/null +++ b/src/table/src/metadata.rs @@ -0,0 +1,168 @@ +use std::collections::HashMap; + +use chrono::{DateTime, NaiveDateTime, TimeZone, Utc}; +use datatypes::schema::SchemaRef; + +pub type TableId = u64; +pub type TableVersion = u64; + +/// Indicates whether and how a filter expression can be handled by a +/// Table for table scans. +#[derive(Debug, Clone, PartialEq)] +pub enum FilterPushDownType { + /// The expression cannot be used by the provider. + Unsupported, + /// The expression can be used to help minimise the data retrieved, + /// but the provider cannot guarantee that all returned tuples + /// satisfy the filter. The Filter plan node containing this expression + /// will be preserved. + Inexact, + /// The provider guarantees that all returned data satisfies this + /// filter expression. The Filter plan node containing this expression + /// will be removed. + Exact, +} + +/// Indicates the type of this table for metadata/catalog purposes. +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum TableType { + /// An ordinary physical table. + Base, + /// A non-materialised table that itself uses a query internally to provide data. + View, + /// A transient table. + Temporary, +} + +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq, Default)] +pub struct TableIdent { + pub table_id: TableId, + pub version: TableVersion, +} + +#[derive(Clone, Debug)] +pub struct TableMeta { + pub schema: SchemaRef, + pub engine: String, + pub engine_options: HashMap, + pub options: HashMap, + pub created_on: DateTime, +} + +#[derive(Clone, Debug)] +pub struct TableInfo { + pub ident: TableIdent, + pub name: String, + pub desc: Option, + pub meta: TableMeta, + pub table_type: TableType, +} + +impl TableIdent { + pub fn new(table_id: TableId) -> Self { + Self { + table_id, + version: 0, + } + } +} + +pub struct TableMetaBuilder { + schema: SchemaRef, + engine: String, + engine_options: HashMap, + options: HashMap, +} + +impl TableMetaBuilder { + pub fn new(schema: SchemaRef) -> Self { + Self { + schema, + engine: String::default(), + engine_options: HashMap::default(), + options: HashMap::default(), + } + } + + pub fn engine(mut self, engine: impl Into) -> Self { + self.engine = engine.into(); + self + } + + pub fn table_option(mut self, name: &str, val: &str) -> Self { + self.options.insert(name.to_string(), val.to_string()); + self + } + + pub fn engine_option(mut self, name: &str, val: &str) -> Self { + self.engine_options + .insert(name.to_string(), val.to_string()); + self + } + + pub fn build(self) -> TableMeta { + TableMeta { + schema: self.schema, + engine: self.engine, + engine_options: self.engine_options, + options: self.options, + // TODO(dennis): use time utilities helper function + created_on: Utc.from_utc_datetime(&NaiveDateTime::from_timestamp(0, 0)), + } + } +} + +pub struct TableInfoBuilder { + ident: TableIdent, + name: String, + desc: Option, + meta: TableMeta, + table_type: TableType, +} + +impl TableInfoBuilder { + pub fn new(name: impl Into, meta: TableMeta) -> Self { + Self { + ident: TableIdent::new(0), + name: name.into(), + desc: None, + meta, + table_type: TableType::Base, + } + } + + pub fn table_id(mut self, id: impl Into) -> Self { + self.ident.table_id = id.into(); + self + } + + pub fn table_version(mut self, version: impl Into) -> Self { + self.ident.version = version.into(); + self + } + + pub fn table_type(mut self, table_type: TableType) -> Self { + self.table_type = table_type; + self + } + + pub fn metadata(mut self, meta: TableMeta) -> Self { + self.meta = meta; + self + } + + pub fn desc(mut self, desc: Option) -> Self { + self.desc = desc; + self + } + + pub fn build(self) -> TableInfo { + TableInfo { + ident: self.ident, + name: self.name, + desc: self.desc, + meta: self.meta, + table_type: self.table_type, + } + } +} diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs new file mode 100644 index 0000000000..48a144bfee --- /dev/null +++ b/src/table/src/requests.rs @@ -0,0 +1,24 @@ +//! Table and TableEngine requests +use std::collections::HashMap; + +use datatypes::prelude::VectorRef; +use datatypes::schema::SchemaRef; + +/// Insert request +pub struct InsertRequest { + pub table_name: String, + pub columns_values: HashMap, +} + +/// Create table request +pub struct CreateTableRequest { + pub name: String, + pub desc: Option, + pub schema: SchemaRef, +} + +/// Alter table request +pub struct AlterTableRequest {} + +/// Drop table request +pub struct DropTableRequest {} diff --git a/src/table/src/table.rs b/src/table/src/table.rs index ac92cbc389..a1d7c4487d 100644 --- a/src/table/src/table.rs +++ b/src/table/src/table.rs @@ -2,70 +2,15 @@ pub mod adapter; pub mod numbers; use std::any::Any; -use std::collections::HashMap; use std::sync::Arc; -use chrono::DateTime; -use chrono::Utc; use common_query::logical_plan::Expr; use common_recordbatch::SendableRecordBatchStream; -use datatypes::schema::{Schema, SchemaRef}; +use datatypes::schema::SchemaRef; use crate::error::Result; - -pub type TableId = u64; -pub type TableVersion = u64; - -/// Indicates whether and how a filter expression can be handled by a -/// Table for table scans. -#[derive(Debug, Clone, PartialEq)] -pub enum TableProviderFilterPushDown { - /// The expression cannot be used by the provider. - Unsupported, - /// The expression can be used to help minimise the data retrieved, - /// but the provider cannot guarantee that all returned tuples - /// satisfy the filter. The Filter plan node containing this expression - /// will be preserved. - Inexact, - /// The provider guarantees that all returned data satisfies this - /// filter expression. The Filter plan node containing this expression - /// will be removed. - Exact, -} - -/// Indicates the type of this table for metadata/catalog purposes. -#[derive(Debug, Clone, Copy, PartialEq)] -pub enum TableType { - /// An ordinary physical table. - Base, - /// A non-materialised table that itself uses a query internally to provide data. - View, - /// A transient table. - Temporary, -} - -#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq, Default)] -pub struct TableIdent { - pub table_id: TableId, - pub version: TableVersion, -} - -#[derive(Debug)] -pub struct TableInfo { - pub ident: TableIdent, - pub name: String, - pub desc: Option, - pub meta: TableMeta, -} - -#[derive(Clone, Debug)] -pub struct TableMeta { - pub schema: Arc, - pub engine: String, - pub engine_options: HashMap, - pub options: HashMap, - pub created_on: DateTime, -} +use crate::metadata::{FilterPushDownType, TableType}; +use crate::requests::InsertRequest; /// Table abstraction. #[async_trait::async_trait] @@ -82,6 +27,11 @@ pub trait Table: Send + Sync { TableType::Base } + /// Insert values into table. + async fn insert(&self, _request: InsertRequest) -> Result { + unimplemented!(); + } + /// Scan the table and returns a SendableRecordBatchStream. async fn scan( &self, @@ -96,8 +46,8 @@ pub trait Table: Send + Sync { /// Tests whether the table provider can make use of a filter expression /// to optimise data retrieval. - fn supports_filter_pushdown(&self, _filter: &Expr) -> Result { - Ok(TableProviderFilterPushDown::Unsupported) + fn supports_filter_pushdown(&self, _filter: &Expr) -> Result { + Ok(FilterPushDownType::Unsupported) } } diff --git a/src/table/src/table/adapter.rs b/src/table/src/table/adapter.rs index b15675cbff..322b4e434b 100644 --- a/src/table/src/table/adapter.rs +++ b/src/table/src/table/adapter.rs @@ -31,7 +31,7 @@ use futures::Stream; use snafu::prelude::*; use crate::error::{self, Result}; -use crate::table::{Table, TableProviderFilterPushDown, TableRef, TableType}; +use crate::table::{FilterPushDownType, Table, TableRef, TableType}; /// Greptime SendableRecordBatchStream -> datafusion ExecutionPlan. struct ExecutionPlanAdapter { @@ -139,7 +139,7 @@ impl TableProvider for DfTableProviderAdapter { filters: &[DfExpr], limit: Option, ) -> DfResult> { - let filters: Vec = filters.iter().map(Clone::clone).map(Expr::new).collect(); + let filters: Vec = filters.iter().map(Clone::clone).map(Into::into).collect(); let stream = self.table.scan(projection, &filters, limit).await?; Ok(Arc::new(ExecutionPlanAdapter { @@ -151,13 +151,11 @@ impl TableProvider for DfTableProviderAdapter { fn supports_filter_pushdown(&self, filter: &DfExpr) -> DfResult { let p = self .table - .supports_filter_pushdown(&Expr::new(filter.clone()))?; + .supports_filter_pushdown(&filter.clone().into())?; match p { - TableProviderFilterPushDown::Unsupported => { - Ok(DfTableProviderFilterPushDown::Unsupported) - } - TableProviderFilterPushDown::Inexact => Ok(DfTableProviderFilterPushDown::Inexact), - TableProviderFilterPushDown::Exact => Ok(DfTableProviderFilterPushDown::Exact), + FilterPushDownType::Unsupported => Ok(DfTableProviderFilterPushDown::Unsupported), + FilterPushDownType::Inexact => Ok(DfTableProviderFilterPushDown::Inexact), + FilterPushDownType::Exact => Ok(DfTableProviderFilterPushDown::Exact), } } } @@ -223,17 +221,15 @@ impl Table for TableAdapter { ))) } - fn supports_filter_pushdown(&self, filter: &Expr) -> Result { + fn supports_filter_pushdown(&self, filter: &Expr) -> Result { match self .table_provider .supports_filter_pushdown(filter.df_expr()) .context(error::DatafusionSnafu)? { - DfTableProviderFilterPushDown::Unsupported => { - Ok(TableProviderFilterPushDown::Unsupported) - } - DfTableProviderFilterPushDown::Inexact => Ok(TableProviderFilterPushDown::Inexact), - DfTableProviderFilterPushDown::Exact => Ok(TableProviderFilterPushDown::Exact), + DfTableProviderFilterPushDown::Unsupported => Ok(FilterPushDownType::Unsupported), + DfTableProviderFilterPushDown::Inexact => Ok(FilterPushDownType::Inexact), + DfTableProviderFilterPushDown::Exact => Ok(FilterPushDownType::Exact), } } }