TableEngine and SqlHandler impl (#45)

* Impl TableEngine, bridge to storage

* Impl sql handler to process insert sql

* fix: minor changes and typo

* test: add datanode test

* test: add table-engine test

* fix: code style

* refactor: split out insert mod from sql and minor changes by CR

* refactor: replace with_context with context
This commit is contained in:
dennis zhuang
2022-06-17 11:36:49 +08:00
committed by GitHub
parent e03ac2fc2b
commit e78c015fc0
36 changed files with 1438 additions and 110 deletions

View File

@@ -6,24 +6,31 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-trait = "0.1"
axum = "0.5"
axum-macros = "0.2"
common-error = { path = "../common/error" }
common-recordbatch = { path = "../common/recordbatch" }
common-telemetry = { path = "../common/telemetry" }
datatypes = { path = "../datatypes"}
hyper = { version = "0.14", features = ["full"] }
metrics = "0.18"
query = { path = "../query" }
serde = "1.0"
serde_json = "1.0"
snafu = { version = "0.7", features = ["backtraces"] }
sql = { path = "../sql" }
storage = { path = "../storage" }
store-api = { path = "../store-api" }
table = { path = "../table" }
table-engine = { path = "../table-engine" }
tokio = { version = "1.18", features = ["full"] }
tower = { version = "0.4", features = ["full"]}
tower-http = { version ="0.3", features = ["full"]}
[dev-dependencies]
axum-test-helper = "0.1"
common-query = { path = "../common/query" }
[dev-dependencies.arrow]
package = "arrow2"

View File

@@ -18,7 +18,7 @@ pub struct Datanode {
opts: DatanodeOptions,
services: Services,
_catalog_list: CatalogListRef,
_instance: InstanceRef,
instance: InstanceRef,
}
impl Datanode {
@@ -30,11 +30,12 @@ impl Datanode {
opts,
services: Services::new(instance.clone()),
_catalog_list: catalog_list,
_instance: instance,
instance,
})
}
pub async fn start(&self) -> Result<()> {
self.instance.start().await?;
self.services.start(&self.opts).await
}
}

View File

@@ -1,6 +1,12 @@
use std::any::Any;
use common_error::prelude::*;
use datatypes::prelude::ConcreteDataType;
use table::error::Error as TableError;
use table_engine::error::Error as TableEngineError;
// TODO(boyan): use ErrorExt instead.
pub type BoxedError = Box<dyn std::error::Error + Send + Sync>;
/// Business error of datanode.
#[derive(Debug, Snafu)]
@@ -18,6 +24,55 @@ pub enum Error {
source: query::error::Error,
},
#[snafu(display("Fail to create table: {}, {}", table_name, source))]
CreateTable {
table_name: String,
source: TableEngineError,
},
#[snafu(display("Fail to get table: {}, {}", table_name, source))]
GetTable {
table_name: String,
source: BoxedError,
},
#[snafu(display("Table not found: {}", table_name))]
TableNotFound { table_name: String },
#[snafu(display("Column {} not found in table {}", column_name, table_name))]
ColumnNotFound {
column_name: String,
table_name: String,
},
#[snafu(display(
"Columns and values number mismatch, columns: {}, values: {}",
columns,
values
))]
ColumnValuesNumberMismatch { columns: usize, values: usize },
#[snafu(display("Fail to parse value: {}, {}", msg, backtrace))]
ParseSqlValue { msg: String, backtrace: Backtrace },
#[snafu(display(
"Column {} expect type: {:?}, actual: {:?}",
column_name,
expect,
actual
))]
ColumnTypeMismatch {
column_name: String,
expect: ConcreteDataType,
actual: ConcreteDataType,
},
#[snafu(display("Fail to insert value to table: {}, {}", table_name, source))]
Insert {
table_name: String,
source: TableError,
},
// The error source of http error is clear even without backtrace now so
// a backtrace is not carried in this varaint.
#[snafu(display("Fail to start HTTP server, source: {}", source))]
@@ -38,6 +93,14 @@ impl ErrorExt for Error {
Error::ExecuteSql { source } | Error::NewCatalog { source } => source.status_code(),
// TODO(yingwen): Further categorize http error.
Error::StartHttp { .. } | Error::ParseAddr { .. } => StatusCode::Internal,
Error::CreateTable { source, .. } => source.status_code(),
Error::GetTable { .. } => StatusCode::Internal,
Error::TableNotFound { .. } => StatusCode::TableNotFound,
Error::ColumnNotFound { .. } => StatusCode::TableColumnNotFound,
Error::ColumnValuesNumberMismatch { .. }
| Error::ParseSqlValue { .. }
| Error::ColumnTypeMismatch { .. } => StatusCode::InvalidArguments,
Error::Insert { source, .. } => source.status_code(),
}
}

View File

@@ -1,17 +1,30 @@
use std::sync::Arc;
use query::catalog::CatalogListRef;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
use query::catalog::{CatalogListRef, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use query::query_engine::{Output, QueryEngineFactory, QueryEngineRef};
use snafu::ResultExt;
use sql::statements::statement::Statement;
use storage::EngineImpl;
use table::engine::EngineContext;
use table::engine::TableEngine;
use table::requests::CreateTableRequest;
use table_engine::engine::MitoEngine;
use crate::error::{ExecuteSqlSnafu, Result};
use crate::error::{CreateTableSnafu, ExecuteSqlSnafu, Result};
use crate::sql::SqlHandler;
type DefaultEngine = MitoEngine<EngineImpl>;
// An abstraction to read/write services.
pub struct Instance {
// Query service
query_engine: QueryEngineRef,
table_engine: DefaultEngine,
sql_handler: SqlHandler<DefaultEngine>,
// Catalog list
_catalog_list: CatalogListRef,
catalog_list: CatalogListRef,
}
pub type InstanceRef = Arc<Instance>;
@@ -20,22 +33,86 @@ impl Instance {
pub fn new(catalog_list: CatalogListRef) -> Self {
let factory = QueryEngineFactory::new(catalog_list.clone());
let query_engine = factory.query_engine().clone();
let table_engine = DefaultEngine::new(EngineImpl::new());
Self {
query_engine,
_catalog_list: catalog_list,
sql_handler: SqlHandler::new(table_engine.clone()),
table_engine,
catalog_list,
}
}
pub async fn execute_sql(&self, sql: &str) -> Result<Output> {
let logical_plan = self
let stmt = self
.query_engine
.sql_to_plan(sql)
.sql_to_statement(sql)
.context(ExecuteSqlSnafu)?;
self.query_engine
.execute(&logical_plan)
match stmt {
Statement::Query(_) => {
let logical_plan = self
.query_engine
.statement_to_plan(stmt)
.context(ExecuteSqlSnafu)?;
self.query_engine
.execute(&logical_plan)
.await
.context(ExecuteSqlSnafu)
}
Statement::Insert(_) => {
let schema_provider = self
.catalog_list
.catalog(DEFAULT_CATALOG_NAME)
.unwrap()
.schema(DEFAULT_SCHEMA_NAME)
.unwrap();
let request = self
.sql_handler
.statement_to_request(schema_provider, stmt)?;
self.sql_handler.execute(request).await
}
_ => unimplemented!(),
}
}
pub async fn start(&self) -> Result<()> {
// FIXME(boyan): create a demo table for test
let column_schemas = vec![
ColumnSchema::new("host", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true),
ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true),
ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), true),
];
let table_name = "demo";
let table = self
.table_engine
.create_table(
&EngineContext::default(),
CreateTableRequest {
name: table_name.to_string(),
desc: Some(" a test table".to_string()),
schema: Arc::new(Schema::new(column_schemas)),
},
)
.await
.context(ExecuteSqlSnafu)
.context(CreateTableSnafu { table_name })?;
let schema_provider = self
.catalog_list
.catalog(DEFAULT_CATALOG_NAME)
.unwrap()
.schema(DEFAULT_SCHEMA_NAME)
.unwrap();
schema_provider
.register_table(table_name.to_string(), table)
.unwrap();
Ok(())
}
}
@@ -48,7 +125,27 @@ mod tests {
use super::*;
#[tokio::test]
async fn test_execute_sql() {
async fn test_execute_insert() {
let catalog_list = memory::new_memory_catalog_list().unwrap();
let instance = Instance::new(catalog_list);
instance.start().await.unwrap();
let output = instance
.execute_sql(
r#"insert into demo(host, cpu, memory, ts) values
('host1', 66.6, 1024, 1655276557000),
('host2', 88.8, 333.3, 1655276558000)
"#,
)
.await
.unwrap();
assert!(matches!(output, Output::AffectedRows(2)));
}
#[tokio::test]
async fn test_execute_query() {
let catalog_list = memory::new_memory_catalog_list().unwrap();
let instance = Instance::new(catalog_list);

View File

@@ -4,6 +4,7 @@ pub mod error;
pub mod instance;
mod metric;
pub mod server;
mod sql;
pub use crate::datanode::Datanode;
pub use crate::datanode::DatanodeOptions;

179
src/datanode/src/sql.rs Normal file
View File

@@ -0,0 +1,179 @@
//! sql handler
mod insert;
use query::catalog::schema::SchemaProviderRef;
use query::query_engine::Output;
use snafu::{OptionExt, ResultExt};
use sql::statements::statement::Statement;
use table::engine::{EngineContext, TableEngine};
use table::requests::*;
use table::TableRef;
use crate::error::{GetTableSnafu, Result, TableNotFoundSnafu};
pub enum SqlRequest {
Insert(InsertRequest),
}
// Handler to execute SQL except query
pub struct SqlHandler<Engine: TableEngine> {
table_engine: Engine,
}
impl<Engine: TableEngine> SqlHandler<Engine> {
pub fn new(table_engine: Engine) -> Self {
Self { table_engine }
}
pub async fn execute(&self, request: SqlRequest) -> Result<Output> {
match request {
SqlRequest::Insert(req) => self.insert(req).await,
}
}
pub(crate) fn get_table(&self, table_name: &str) -> Result<TableRef> {
self.table_engine
.get_table(&EngineContext::default(), table_name)
.map_err(|e| Box::new(e) as _)
.context(GetTableSnafu { table_name })?
.context(TableNotFoundSnafu { table_name })
}
// Cast sql statement into sql request
pub(crate) fn statement_to_request(
&self,
schema_provider: SchemaProviderRef,
statement: Statement,
) -> Result<SqlRequest> {
match statement {
Statement::Insert(stmt) => self.insert_to_request(schema_provider, *stmt),
_ => unimplemented!(),
}
}
}
#[cfg(test)]
mod tests {
use std::any::Any;
use std::sync::Arc;
use common_query::logical_plan::Expr;
use common_recordbatch::SendableRecordBatchStream;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::value::Value;
use query::catalog::memory;
use query::catalog::schema::SchemaProvider;
use query::error::Result as QueryResult;
use query::QueryEngineFactory;
use storage::EngineImpl;
use table::error::Result as TableResult;
use table::{Table, TableRef};
use table_engine::engine::MitoEngine;
use super::*;
struct DemoTable;
#[async_trait::async_trait]
impl Table for DemoTable {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
let column_schemas = vec![
ColumnSchema::new("host", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true),
ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true),
ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), true),
];
Arc::new(Schema::new(column_schemas))
}
async fn scan(
&self,
_projection: &Option<Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> TableResult<SendableRecordBatchStream> {
unimplemented!();
}
}
struct MockSchemaProvider;
impl SchemaProvider for MockSchemaProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn table_names(&self) -> Vec<String> {
vec!["demo".to_string()]
}
fn table(&self, name: &str) -> Option<TableRef> {
assert_eq!(name, "demo");
Some(Arc::new(DemoTable {}))
}
fn register_table(&self, _name: String, _table: TableRef) -> QueryResult<Option<TableRef>> {
unimplemented!();
}
fn deregister_table(&self, _name: &str) -> QueryResult<Option<TableRef>> {
unimplemented!();
}
fn table_exist(&self, name: &str) -> bool {
name == "demo"
}
}
#[test]
fn test_statement_to_request() {
let catalog_list = memory::new_memory_catalog_list().unwrap();
let factory = QueryEngineFactory::new(catalog_list);
let query_engine = factory.query_engine().clone();
let sql = r#"insert into demo(host, cpu, memory, ts) values
('host1', 66.6, 1024, 1655276557000),
('host2', 88.8, 333.3, 1655276558000)
"#;
let table_engine = MitoEngine::<EngineImpl>::new(EngineImpl::new());
let sql_handler = SqlHandler::new(table_engine);
let stmt = query_engine.sql_to_statement(sql).unwrap();
let schema_provider = Arc::new(MockSchemaProvider {});
let request = sql_handler
.statement_to_request(schema_provider, stmt)
.unwrap();
match request {
SqlRequest::Insert(req) => {
assert_eq!(req.table_name, "demo");
let columns_values = req.columns_values;
assert_eq!(4, columns_values.len());
let hosts = &columns_values["host"];
assert_eq!(2, hosts.len());
assert_eq!(Value::from("host1"), hosts.get(0));
assert_eq!(Value::from("host2"), hosts.get(1));
let cpus = &columns_values["cpu"];
assert_eq!(2, cpus.len());
assert_eq!(Value::from(66.6f64), cpus.get(0));
assert_eq!(Value::from(88.8f64), cpus.get(1));
let memories = &columns_values["memory"];
assert_eq!(2, memories.len());
assert_eq!(Value::from(1024f64), memories.get(0));
assert_eq!(Value::from(333.3f64), memories.get(1));
let ts = &columns_values["ts"];
assert_eq!(2, ts.len());
assert_eq!(Value::from(1655276557000i64), ts.get(0));
assert_eq!(Value::from(1655276558000i64), ts.get(1));
}
}
}
}

View File

@@ -0,0 +1,263 @@
use std::str::FromStr;
use datatypes::prelude::ConcreteDataType;
use datatypes::prelude::VectorBuilder;
use datatypes::value::Value;
use query::catalog::schema::SchemaProviderRef;
use query::query_engine::Output;
use snafu::ensure;
use snafu::OptionExt;
use snafu::ResultExt;
use sql::ast::Value as SqlValue;
use sql::statements::insert::Insert;
use table::engine::TableEngine;
use table::requests::*;
use crate::error::{
ColumnNotFoundSnafu, ColumnTypeMismatchSnafu, ColumnValuesNumberMismatchSnafu, InsertSnafu,
ParseSqlValueSnafu, Result, TableNotFoundSnafu,
};
use crate::sql::{SqlHandler, SqlRequest};
impl<Engine: TableEngine> SqlHandler<Engine> {
pub(crate) async fn insert(&self, req: InsertRequest) -> Result<Output> {
let table_name = &req.table_name.to_string();
let table = self.get_table(table_name)?;
let affected_rows = table
.insert(req)
.await
.context(InsertSnafu { table_name })?;
Ok(Output::AffectedRows(affected_rows))
}
pub(crate) fn insert_to_request(
&self,
schema_provider: SchemaProviderRef,
stmt: Insert,
) -> Result<SqlRequest> {
let columns = stmt.columns();
let values = stmt.values();
//TODO(dennis): table name may be in the form of `catalog.schema.table`,
// but we don't process it right now.
let table_name = stmt.table_name();
let table = schema_provider
.table(&table_name)
.context(TableNotFoundSnafu {
table_name: &table_name,
})?;
let schema = table.schema();
let columns_num = if columns.is_empty() {
schema.column_schemas().len()
} else {
columns.len()
};
let rows_num = values.len();
let mut columns_builders: Vec<(&String, &ConcreteDataType, VectorBuilder)> =
Vec::with_capacity(columns_num);
if columns.is_empty() {
for column_schema in schema.column_schemas() {
let data_type = &column_schema.data_type;
columns_builders.push((
&column_schema.name,
data_type,
VectorBuilder::with_capacity(data_type.clone(), rows_num),
));
}
} else {
for column_name in columns {
let column_schema =
schema.column_schema_by_name(column_name).with_context(|| {
ColumnNotFoundSnafu {
table_name: &table_name,
column_name: column_name.to_string(),
}
})?;
let data_type = &column_schema.data_type;
columns_builders.push((
column_name,
data_type,
VectorBuilder::with_capacity(data_type.clone(), rows_num),
));
}
}
// Convert rows into columns
for row in values {
ensure!(
row.len() == columns_num,
ColumnValuesNumberMismatchSnafu {
columns: columns_num,
values: row.len(),
}
);
for (sql_val, (column_name, data_type, builder)) in
row.iter().zip(columns_builders.iter_mut())
{
add_row_to_vector(column_name, data_type, sql_val, builder)?;
}
}
Ok(SqlRequest::Insert(InsertRequest {
table_name,
columns_values: columns_builders
.into_iter()
.map(|(c, _, mut b)| (c.to_owned(), b.finish()))
.collect(),
}))
}
}
fn add_row_to_vector(
column_name: &str,
data_type: &ConcreteDataType,
sql_val: &SqlValue,
builder: &mut VectorBuilder,
) -> Result<()> {
let value = parse_sql_value(column_name, data_type, sql_val)?;
builder.push(&value);
Ok(())
}
fn parse_sql_value(
column_name: &str,
data_type: &ConcreteDataType,
sql_val: &SqlValue,
) -> Result<Value> {
Ok(match sql_val {
SqlValue::Number(n, _) => sql_number_to_value(data_type, n)?,
SqlValue::Null => Value::Null,
SqlValue::Boolean(b) => {
ensure!(
data_type.is_boolean(),
ColumnTypeMismatchSnafu {
column_name,
expect: data_type.clone(),
actual: ConcreteDataType::boolean_datatype(),
}
);
(*b).into()
}
SqlValue::DoubleQuotedString(s) | SqlValue::SingleQuotedString(s) => {
ensure!(
data_type.is_string(),
ColumnTypeMismatchSnafu {
column_name,
expect: data_type.clone(),
actual: ConcreteDataType::string_datatype(),
}
);
s.to_owned().into()
}
_ => todo!("Other sql value"),
})
}
macro_rules! parse_number_to_value {
($data_type: expr, $n: ident, $(($Type: ident, $PrimitiveType: ident)), +) => {
match $data_type {
$(
ConcreteDataType::$Type(_) => {
let n = parse_sql_number::<$PrimitiveType>($n)?;
Ok(Value::from(n))
},
)+
_ => ParseSqlValueSnafu {
msg: format!("Fail to parse number {}, invalid column type: {:?}",
$n, $data_type
)}.fail(),
}
}
}
fn sql_number_to_value(data_type: &ConcreteDataType, n: &str) -> Result<Value> {
parse_number_to_value!(
data_type,
n,
(UInt8, u8),
(UInt16, u16),
(UInt32, u32),
(UInt64, u64),
(Int8, i8),
(Int16, i16),
(Int32, i32),
(Int64, i64),
(Float64, f64),
(Float32, f32)
)
}
fn parse_sql_number<R: FromStr + std::fmt::Debug>(n: &str) -> Result<R>
where
<R as FromStr>::Err: std::fmt::Debug,
{
match n.parse::<R>() {
Ok(n) => Ok(n),
Err(e) => ParseSqlValueSnafu {
msg: format!("Fail to parse number {}, {:?}", n, e),
}
.fail(),
}
}
#[cfg(test)]
mod tests {
use datatypes::value::OrderedFloat;
use super::*;
#[test]
fn test_sql_number_to_value() {
let v = sql_number_to_value(&ConcreteDataType::float64_datatype(), "3.0").unwrap();
assert_eq!(Value::Float64(OrderedFloat(3.0)), v);
let v = sql_number_to_value(&ConcreteDataType::int32_datatype(), "999").unwrap();
assert_eq!(Value::Int32(999), v);
let v = sql_number_to_value(&ConcreteDataType::string_datatype(), "999");
assert!(v.is_err(), "parse value error is: {:?}", v);
}
#[test]
fn test_parse_sql_value() {
let sql_val = SqlValue::Null;
assert_eq!(
Value::Null,
parse_sql_value("a", &ConcreteDataType::float64_datatype(), &sql_val).unwrap()
);
let sql_val = SqlValue::Boolean(true);
assert_eq!(
Value::Boolean(true),
parse_sql_value("a", &ConcreteDataType::boolean_datatype(), &sql_val).unwrap()
);
let sql_val = SqlValue::Number("3.0".to_string(), false);
assert_eq!(
Value::Float64(OrderedFloat(3.0)),
parse_sql_value("a", &ConcreteDataType::float64_datatype(), &sql_val).unwrap()
);
let sql_val = SqlValue::Number("3.0".to_string(), false);
let v = parse_sql_value("a", &ConcreteDataType::boolean_datatype(), &sql_val);
assert!(v.is_err());
assert!(format!("{:?}", v)
.contains("Fail to parse number 3.0, invalid column type: Boolean(BooleanType)"));
let sql_val = SqlValue::Boolean(true);
let v = parse_sql_value("a", &ConcreteDataType::float64_datatype(), &sql_val);
assert!(v.is_err());
assert!(format!("{:?}", v).contains(
"column_name: \"a\", expect: Float64(Float64), actual: Boolean(BooleanType)"
));
}
}