From 5f322ba16ebb450917017dfb6e2408b4dc1511a4 Mon Sep 17 00:00:00 2001 From: dennis zhuang Date: Thu, 22 Sep 2022 10:43:21 +0800 Subject: [PATCH] feat: impl default constraint for column (#273) * feat: impl default value for column in schema * test: add test for column's default value * refactor: rename ColumnDefaultValue to ColumnDefaultConstraint * fix: timestamp column may be a constant vector * fix: test_shutdown_pg_server * fix: typo Co-authored-by: LFC * fix: typo Co-authored-by: LFC * fix: typo Co-authored-by: LFC * chore: use table_info directly Co-authored-by: LFC * refactor: by CR comments Co-authored-by: LFC --- Cargo.lock | 1 + src/api/greptime/v1/column.proto | 1 + src/catalog/src/system.rs | 3 +- src/client/src/database.rs | 9 +- src/client/src/error.rs | 10 +- src/datanode/src/error.rs | 43 +-- src/datanode/src/server/grpc/ddl.rs | 42 ++- src/datanode/src/server/grpc/insert.rs | 3 +- src/datanode/src/sql.rs | 3 +- src/datanode/src/sql/create.rs | 3 +- src/datanode/src/sql/insert.rs | 221 +------------- src/datanode/src/tests/grpc_test.rs | 5 + src/datanode/src/tests/test_util.rs | 3 +- src/datatypes/src/error.rs | 7 + src/datatypes/src/schema.rs | 152 ++++++++-- src/frontend/src/error.rs | 12 + src/frontend/src/instance.rs | 79 +++-- src/script/src/table.rs | 4 +- src/servers/tests/postgres/mod.rs | 24 +- src/sql/Cargo.toml | 1 + src/sql/src/ast.rs | 4 +- src/sql/src/error.rs | 34 ++- src/sql/src/statements.rs | 275 +++++++++++++++++- .../benches/memtable/util/schema_util.rs | 3 +- src/storage/src/proto/write_batch.rs | 5 +- src/storage/src/schema.rs | 16 +- src/storage/src/test_util/schema_util.rs | 4 +- src/storage/src/write_batch.rs | 53 +++- src/store-api/src/storage.rs | 4 +- src/store-api/src/storage/descriptors.rs | 28 +- src/store-api/src/storage/requests.rs | 2 +- src/table-engine/src/engine.rs | 129 ++++++++ src/table-engine/src/error.rs | 4 + src/table-engine/src/table.rs | 97 +++++- src/table-engine/src/table/test_util.rs | 3 +- 35 files changed, 938 insertions(+), 349 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ab6ecd3918..500022b528 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4831,6 +4831,7 @@ name = "sql" version = "0.1.0" dependencies = [ "common-error", + "common-time", "datatypes", "snafu", "sqlparser", diff --git a/src/api/greptime/v1/column.proto b/src/api/greptime/v1/column.proto index 3abcbd54f7..59338bbd68 100644 --- a/src/api/greptime/v1/column.proto +++ b/src/api/greptime/v1/column.proto @@ -56,6 +56,7 @@ message ColumnDef { string name = 1; ColumnDataType datatype = 2; bool is_nullable = 3; + optional bytes default_constraint = 4; } enum ColumnDataType { diff --git a/src/catalog/src/system.rs b/src/catalog/src/system.rs index 0e5db53d0e..dee3b25d11 100644 --- a/src/catalog/src/system.rs +++ b/src/catalog/src/system.rs @@ -151,7 +151,8 @@ fn build_system_catalog_schema() -> Schema { ]; // The schema of this table must be valid. - SchemaBuilder::from(cols) + SchemaBuilder::try_from(cols) + .unwrap() .timestamp_index(2) .build() .unwrap() diff --git a/src/client/src/database.rs b/src/client/src/database.rs index edf166ef3b..0da28fd5d7 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -23,8 +23,11 @@ use snafu::{ensure, OptionExt, ResultExt}; use crate::error; use crate::{ - error::DatanodeSnafu, error::DecodeSelectSnafu, error::EncodePhysicalSnafu, - error::MissingFieldSnafu, Client, Result, + error::{ + ConvertSchemaSnafu, DatanodeSnafu, DecodeSelectSnafu, EncodePhysicalSnafu, + MissingFieldSnafu, + }, + Client, Result, }; pub const PROTOCOL_VERSION: u32 = 1; @@ -194,7 +197,7 @@ impl TryFrom for Output { }) .collect::>(); - let schema = Arc::new(Schema::new(column_schemas)); + let schema = Arc::new(Schema::try_new(column_schemas).context(ConvertSchemaSnafu)?); let recordbatches = if vectors.is_empty() { RecordBatches::try_new(schema, vec![]) } else { diff --git a/src/client/src/error.rs b/src/client/src/error.rs index 29bbd20b7b..7169150ae4 100644 --- a/src/client/src/error.rs +++ b/src/client/src/error.rs @@ -79,6 +79,12 @@ pub enum Error { #[snafu(display("Missing required field in protobuf, field: {}", field))] MissingField { field: String, backtrace: Backtrace }, + + #[snafu(display("Failed to convert schema, source: {}", source))] + ConvertSchema { + #[snafu(backtrace)] + source: datatypes::error::Error, + }, } pub type Result = std::result::Result; @@ -97,7 +103,9 @@ impl ErrorExt for Error { | Error::InvalidColumnProto { .. } | Error::ColumnDataType { .. } | Error::MissingField { .. } => StatusCode::Internal, - Error::CreateVector { source } => source.status_code(), + Error::ConvertSchema { source } | Error::CreateVector { source } => { + source.status_code() + } Error::CreateRecordBatches { source } => source.status_code(), Error::IllegalGrpcClientState { .. } => StatusCode::Unexpected, } diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 7be0fd58eb..71026a42db 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -2,7 +2,6 @@ use std::any::Any; use api::serde::DecodeError; use common_error::prelude::*; -use datatypes::prelude::ConcreteDataType; use storage::error::Error as StorageError; use table::error::Error as TableError; @@ -68,19 +67,10 @@ pub enum Error { ))] ColumnValuesNumberMismatch { columns: usize, values: usize }, - #[snafu(display("Failed to parse value: {}, {}", msg, backtrace))] - ParseSqlValue { msg: String, backtrace: Backtrace }, - - #[snafu(display( - "Column {} expect type: {:?}, actual: {:?}", - column_name, - expect, - actual - ))] - ColumnTypeMismatch { - column_name: String, - expect: ConcreteDataType, - actual: ConcreteDataType, + #[snafu(display("Failed to parse sql value, source: {}", source))] + ParseSqlValue { + #[snafu(backtrace)] + source: sql::error::Error, }, #[snafu(display("Failed to insert value to table: {}, source: {}", table_name, source))] @@ -189,6 +179,12 @@ pub enum Error { source: api::error::Error, }, + #[snafu(display("Column default constraint error, source: {}", source))] + ColumnDefaultConstraint { + #[snafu(backtrace)] + source: datatypes::error::Error, + }, + #[snafu(display("Failed to parse SQL, source: {}", source))] ParseSql { #[snafu(backtrace)] @@ -216,23 +212,32 @@ impl ErrorExt for Error { Error::ExecuteSql { source } => source.status_code(), Error::ExecutePhysicalPlan { source } => source.status_code(), Error::NewCatalog { source } => source.status_code(), + Error::CreateTable { source, .. } | Error::GetTable { source, .. } | Error::AlterTable { source, .. } => source.status_code(), + Error::Insert { source, .. } => source.status_code(), - Error::ConvertSchema { source, .. } => source.status_code(), + Error::TableNotFound { .. } => StatusCode::TableNotFound, Error::ColumnNotFound { .. } => StatusCode::TableColumnNotFound, + + Error::ParseSqlValue { source, .. } | Error::ParseSql { source, .. } => { + source.status_code() + } + + Error::ColumnDefaultConstraint { source, .. } + | Error::CreateSchema { source, .. } + | Error::ConvertSchema { source, .. } => source.status_code(), + Error::ColumnValuesNumberMismatch { .. } - | Error::ParseSqlValue { .. } - | Error::ColumnTypeMismatch { .. } | Error::IllegalInsertData { .. } | Error::DecodeInsert { .. } | Error::InvalidSql { .. } - | Error::CreateSchema { .. } | Error::KeyColumnNotFound { .. } | Error::MissingField { .. } | Error::ConstraintNotSupported { .. } => StatusCode::InvalidArguments, + // TODO(yingwen): Further categorize http error. Error::StartServer { .. } | Error::ParseAddr { .. } @@ -244,7 +249,7 @@ impl ErrorExt for Error { | Error::IntoPhysicalPlan { .. } | Error::UnsupportedExpr { .. } | Error::ColumnDataType { .. } => StatusCode::Internal, - Error::ParseSql { source } => source.status_code(), + Error::InitBackend { .. } => StatusCode::StorageUnavailable, Error::OpenLogStore { source } => source.status_code(), Error::StartScriptManager { source } => source.status_code(), diff --git a/src/datanode/src/server/grpc/ddl.rs b/src/datanode/src/server/grpc/ddl.rs index 983bf4930b..3c633ae42c 100644 --- a/src/datanode/src/server/grpc/ddl.rs +++ b/src/datanode/src/server/grpc/ddl.rs @@ -4,12 +4,13 @@ use api::helper::ColumnDataTypeWrapper; use api::v1::{alter_expr::Kind, AdminResult, AlterExpr, ColumnDef, CreateExpr}; use common_error::prelude::{ErrorExt, StatusCode}; use common_query::Output; +use datatypes::schema::ColumnDefaultConstraint; use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef}; use futures::TryFutureExt; use snafu::prelude::*; use table::requests::{AlterKind, AlterTableRequest, CreateTableRequest}; -use crate::error::{self, MissingFieldSnafu, Result}; +use crate::error::{self, ColumnDefaultConstraintSnafu, MissingFieldSnafu, Result}; use crate::instance::Instance; use crate::server::grpc::handler::AdminResultBuilder; use crate::sql::SqlRequest; @@ -131,7 +132,8 @@ fn create_table_schema(expr: &CreateExpr) -> Result { name: &expr.time_index, })?; Ok(Arc::new( - SchemaBuilder::from(column_schemas) + SchemaBuilder::try_from(column_schemas) + .context(error::CreateSchemaSnafu)? .timestamp_index(ts_index) .build() .context(error::CreateSchemaSnafu)?, @@ -145,6 +147,12 @@ fn create_column_schema(column_def: &ColumnDef) -> Result { name: column_def.name.clone(), data_type: data_type.into(), is_nullable: column_def.is_nullable, + default_constraint: match &column_def.default_constraint { + None => None, + Some(v) => Some( + ColumnDefaultConstraint::try_from(&v[..]).context(ColumnDefaultConstraintSnafu)?, + ), + }, }) } @@ -154,6 +162,7 @@ mod tests { use catalog::MIN_USER_TABLE_ID; use datatypes::prelude::ConcreteDataType; + use datatypes::value::Value; use super::*; use crate::tests::test_util; @@ -206,6 +215,7 @@ mod tests { name: "a".to_string(), datatype: 1024, is_nullable: true, + default_constraint: None, }; let result = create_column_schema(&column_def); assert!(result.is_err()); @@ -218,11 +228,28 @@ mod tests { name: "a".to_string(), datatype: 12, // string is_nullable: true, + default_constraint: None, }; let column_schema = create_column_schema(&column_def).unwrap(); assert_eq!(column_schema.name, "a"); assert_eq!(column_schema.data_type, ConcreteDataType::string_datatype()); assert!(column_schema.is_nullable); + + let default_constraint = ColumnDefaultConstraint::Value(Value::from("defaut value")); + let column_def = ColumnDef { + name: "a".to_string(), + datatype: 12, // string + is_nullable: true, + default_constraint: Some(default_constraint.clone().try_into().unwrap()), + }; + let column_schema = create_column_schema(&column_def).unwrap(); + assert_eq!(column_schema.name, "a"); + assert_eq!(column_schema.data_type, ConcreteDataType::string_datatype()); + assert!(column_schema.is_nullable); + assert_eq!( + default_constraint, + column_schema.default_constraint.unwrap() + ); } fn testing_create_expr() -> CreateExpr { @@ -231,21 +258,25 @@ mod tests { name: "host".to_string(), datatype: 12, // string is_nullable: false, + default_constraint: None, }, ColumnDef { name: "ts".to_string(), datatype: 15, // timestamp is_nullable: false, + default_constraint: None, }, ColumnDef { name: "cpu".to_string(), datatype: 9, // float32 is_nullable: true, + default_constraint: None, }, ColumnDef { name: "memory".to_string(), datatype: 10, // float64 is_nullable: true, + default_constraint: None, }, ]; CreateExpr { @@ -267,25 +298,30 @@ mod tests { name: "host".to_string(), data_type: ConcreteDataType::string_datatype(), is_nullable: false, + default_constraint: None, }, ColumnSchema { name: "ts".to_string(), data_type: ConcreteDataType::timestamp_millis_datatype(), is_nullable: false, + default_constraint: None, }, ColumnSchema { name: "cpu".to_string(), data_type: ConcreteDataType::float32_datatype(), is_nullable: true, + default_constraint: None, }, ColumnSchema { name: "memory".to_string(), data_type: ConcreteDataType::float64_datatype(), is_nullable: true, + default_constraint: None, }, ]; Arc::new( - SchemaBuilder::from(column_schemas) + SchemaBuilder::try_from(column_schemas) + .unwrap() .timestamp_index(1) .build() .unwrap(), diff --git a/src/datanode/src/server/grpc/insert.rs b/src/datanode/src/server/grpc/insert.rs index 9b539b5eb1..60bb309c38 100644 --- a/src/datanode/src/server/grpc/insert.rs +++ b/src/datanode/src/server/grpc/insert.rs @@ -297,7 +297,8 @@ mod tests { ]; Arc::new( - SchemaBuilder::from(column_schemas) + SchemaBuilder::try_from(column_schemas) + .unwrap() .timestamp_index(3) .build() .unwrap(), diff --git a/src/datanode/src/sql.rs b/src/datanode/src/sql.rs index 11af1ae71a..a726b52f68 100644 --- a/src/datanode/src/sql.rs +++ b/src/datanode/src/sql.rs @@ -97,7 +97,8 @@ mod tests { ]; Arc::new( - SchemaBuilder::from(column_schemas) + SchemaBuilder::try_from(column_schemas) + .unwrap() .timestamp_index(3) .build() .unwrap(), diff --git a/src/datanode/src/sql/create.rs b/src/datanode/src/sql/create.rs index 8a341294d5..e409b01cce 100644 --- a/src/datanode/src/sql/create.rs +++ b/src/datanode/src/sql/create.rs @@ -135,7 +135,8 @@ impl SqlHandler { .collect::>>()?; let schema = Arc::new( - SchemaBuilder::from(columns_schemas) + SchemaBuilder::try_from(columns_schemas) + .context(CreateSchemaSnafu)? .timestamp_index(ts_index) .build() .context(CreateSchemaSnafu)?, diff --git a/src/datanode/src/sql/insert.rs b/src/datanode/src/sql/insert.rs index 99b488356c..e4a0d52db7 100644 --- a/src/datanode/src/sql/insert.rs +++ b/src/datanode/src/sql/insert.rs @@ -1,20 +1,17 @@ -use std::str::FromStr; - use catalog::SchemaProviderRef; use common_query::Output; use datatypes::prelude::ConcreteDataType; use datatypes::prelude::VectorBuilder; -use datatypes::value::Value; use snafu::ensure; use snafu::OptionExt; use snafu::ResultExt; use sql::ast::Value as SqlValue; -use sql::statements::insert::Insert; +use sql::statements::{self, insert::Insert}; use table::requests::*; use crate::error::{ - ColumnNotFoundSnafu, ColumnTypeMismatchSnafu, ColumnValuesNumberMismatchSnafu, InsertSnafu, - ParseSqlValueSnafu, Result, TableNotFoundSnafu, + ColumnNotFoundSnafu, ColumnValuesNumberMismatchSnafu, InsertSnafu, ParseSqlValueSnafu, Result, + TableNotFoundSnafu, }; use crate::sql::{SqlHandler, SqlRequest}; @@ -118,217 +115,9 @@ fn add_row_to_vector( sql_val: &SqlValue, builder: &mut VectorBuilder, ) -> Result<()> { - let value = parse_sql_value(column_name, data_type, sql_val)?; + let value = statements::sql_value_to_value(column_name, data_type, sql_val) + .context(ParseSqlValueSnafu)?; builder.push(&value); Ok(()) } - -fn parse_sql_value( - column_name: &str, - data_type: &ConcreteDataType, - sql_val: &SqlValue, -) -> Result { - Ok(match sql_val { - SqlValue::Number(n, _) => sql_number_to_value(data_type, n)?, - SqlValue::Null => Value::Null, - SqlValue::Boolean(b) => { - ensure!( - data_type.is_boolean(), - ColumnTypeMismatchSnafu { - column_name, - expect: data_type.clone(), - actual: ConcreteDataType::boolean_datatype(), - } - ); - - (*b).into() - } - SqlValue::DoubleQuotedString(s) | SqlValue::SingleQuotedString(s) => { - ensure!( - data_type.is_string(), - ColumnTypeMismatchSnafu { - column_name, - expect: data_type.clone(), - actual: ConcreteDataType::string_datatype(), - } - ); - - parse_string_to_value(s.to_owned(), data_type)? - } - _ => todo!("Other sql value"), - }) -} - -fn parse_string_to_value(s: String, data_type: &ConcreteDataType) -> Result { - match data_type { - ConcreteDataType::String(_) => Ok(Value::String(s.into())), - ConcreteDataType::Date(_) => { - if let Ok(date) = common_time::date::Date::from_str(&s) { - Ok(Value::Date(date)) - } else { - ParseSqlValueSnafu { - msg: format!("Failed to parse {} to Date value", s), - } - .fail() - } - } - ConcreteDataType::DateTime(_) => { - if let Ok(datetime) = common_time::datetime::DateTime::from_str(&s) { - Ok(Value::DateTime(datetime)) - } else { - ParseSqlValueSnafu { - msg: format!("Failed to parse {} to DateTime value", s), - } - .fail() - } - } - _ => { - unreachable!() - } - } -} - -macro_rules! parse_number_to_value { - ($data_type: expr, $n: ident, $(($Type: ident, $PrimitiveType: ident)), +) => { - match $data_type { - $( - ConcreteDataType::$Type(_) => { - let n = parse_sql_number::<$PrimitiveType>($n)?; - Ok(Value::from(n)) - }, - )+ - _ => ParseSqlValueSnafu { - msg: format!("Fail to parse number {}, invalid column type: {:?}", - $n, $data_type - )}.fail(), - } - } -} - -fn sql_number_to_value(data_type: &ConcreteDataType, n: &str) -> Result { - parse_number_to_value!( - data_type, - n, - (UInt8, u8), - (UInt16, u16), - (UInt32, u32), - (UInt64, u64), - (Int8, i8), - (Int16, i16), - (Int32, i32), - (Int64, i64), - (Float64, f64), - (Float32, f32), - (Timestamp, i64) - ) - // TODO(hl): also Date/DateTime -} - -fn parse_sql_number(n: &str) -> Result -where - ::Err: std::fmt::Debug, -{ - match n.parse::() { - Ok(n) => Ok(n), - Err(e) => ParseSqlValueSnafu { - msg: format!("Fail to parse number {}, {:?}", n, e), - } - .fail(), - } -} - -#[cfg(test)] -mod tests { - use datatypes::value::OrderedFloat; - - use super::*; - - #[test] - fn test_sql_number_to_value() { - let v = sql_number_to_value(&ConcreteDataType::float64_datatype(), "3.0").unwrap(); - assert_eq!(Value::Float64(OrderedFloat(3.0)), v); - - let v = sql_number_to_value(&ConcreteDataType::int32_datatype(), "999").unwrap(); - assert_eq!(Value::Int32(999), v); - - let v = sql_number_to_value(&ConcreteDataType::string_datatype(), "999"); - assert!(v.is_err(), "parse value error is: {:?}", v); - } - - #[test] - fn test_parse_sql_value() { - let sql_val = SqlValue::Null; - assert_eq!( - Value::Null, - parse_sql_value("a", &ConcreteDataType::float64_datatype(), &sql_val).unwrap() - ); - - let sql_val = SqlValue::Boolean(true); - assert_eq!( - Value::Boolean(true), - parse_sql_value("a", &ConcreteDataType::boolean_datatype(), &sql_val).unwrap() - ); - - let sql_val = SqlValue::Number("3.0".to_string(), false); - assert_eq!( - Value::Float64(OrderedFloat(3.0)), - parse_sql_value("a", &ConcreteDataType::float64_datatype(), &sql_val).unwrap() - ); - - let sql_val = SqlValue::Number("3.0".to_string(), false); - let v = parse_sql_value("a", &ConcreteDataType::boolean_datatype(), &sql_val); - assert!(v.is_err()); - assert!(format!("{:?}", v) - .contains("Fail to parse number 3.0, invalid column type: Boolean(BooleanType)")); - - let sql_val = SqlValue::Boolean(true); - let v = parse_sql_value("a", &ConcreteDataType::float64_datatype(), &sql_val); - assert!(v.is_err()); - assert!(format!("{:?}", v).contains( - "column_name: \"a\", expect: Float64(Float64), actual: Boolean(BooleanType)" - )); - } - - #[test] - pub fn test_parse_date_literal() { - let value = parse_sql_value( - "date", - &ConcreteDataType::date_datatype(), - &SqlValue::DoubleQuotedString("2022-02-22".to_string()), - ) - .unwrap(); - assert_eq!(ConcreteDataType::date_datatype(), value.data_type()); - if let Value::Date(d) = value { - assert_eq!("2022-02-22", d.to_string()); - } else { - unreachable!() - } - } - - #[test] - pub fn test_parse_datetime_literal() { - let value = parse_sql_value( - "datetime_col", - &ConcreteDataType::datetime_datatype(), - &SqlValue::DoubleQuotedString("2022-02-22 00:01:03".to_string()), - ) - .unwrap(); - assert_eq!(ConcreteDataType::date_datatype(), value.data_type()); - if let Value::DateTime(d) = value { - assert_eq!("2022-02-22 00:01:03", d.to_string()); - } else { - unreachable!() - } - } - - #[test] - pub fn test_parse_illegal_datetime_literal() { - assert!(parse_sql_value( - "datetime_col", - &ConcreteDataType::datetime_datatype(), - &SqlValue::DoubleQuotedString("2022-02-22 00:01:61".to_string()), - ) - .is_err()); - } -} diff --git a/src/datanode/src/tests/grpc_test.rs b/src/datanode/src/tests/grpc_test.rs index 215ce5d61a..5d2526d982 100644 --- a/src/datanode/src/tests/grpc_test.rs +++ b/src/datanode/src/tests/grpc_test.rs @@ -97,6 +97,7 @@ async fn test_insert_and_select() { name: "test_column".to_string(), datatype: ColumnDataType::Int64.into(), is_nullable: true, + default_constraint: None, }; let kind = Kind::AddColumn(AddColumn { column_def: Some(add_column), @@ -162,21 +163,25 @@ fn testing_create_expr() -> CreateExpr { name: "host".to_string(), datatype: 12, // string is_nullable: false, + default_constraint: None, }, ColumnDef { name: "cpu".to_string(), datatype: 10, // float64 is_nullable: true, + default_constraint: None, }, ColumnDef { name: "memory".to_string(), datatype: 10, // float64 is_nullable: true, + default_constraint: None, }, ColumnDef { name: "ts".to_string(), datatype: 15, // timestamp is_nullable: true, + default_constraint: None, }, ]; CreateExpr { diff --git a/src/datanode/src/tests/test_util.rs b/src/datanode/src/tests/test_util.rs index d949ee2bea..b18124038a 100644 --- a/src/datanode/src/tests/test_util.rs +++ b/src/datanode/src/tests/test_util.rs @@ -63,7 +63,8 @@ pub async fn create_test_table(instance: &Instance) -> Result<()> { table_name: table_name.to_string(), desc: Some(" a test table".to_string()), schema: Arc::new( - SchemaBuilder::from(column_schemas) + SchemaBuilder::try_from(column_schemas) + .unwrap() .timestamp_index(3) .build() .expect("ts is expected to be timestamp column"), diff --git a/src/datatypes/src/error.rs b/src/datatypes/src/error.rs index efa105a312..2e4ad9136d 100644 --- a/src/datatypes/src/error.rs +++ b/src/datatypes/src/error.rs @@ -12,6 +12,13 @@ pub enum Error { backtrace: Backtrace, }, + #[snafu(display("Failed to deserialize data, source: {}, json: {}", source, json))] + Deserialize { + source: serde_json::Error, + backtrace: Backtrace, + json: String, + }, + #[snafu(display("Failed to convert datafusion type: {}", from))] Conversion { from: String, backtrace: Backtrace }, diff --git a/src/datatypes/src/schema.rs b/src/datatypes/src/schema.rs index e588200e62..1da88769fc 100644 --- a/src/datatypes/src/schema.rs +++ b/src/datatypes/src/schema.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; pub use arrow::datatypes::Metadata; @@ -7,7 +7,36 @@ use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; use crate::data_type::{ConcreteDataType, DataType}; -use crate::error::{self, Error, Result}; +use crate::error::{self, DeserializeSnafu, Error, Result, SerializeSnafu}; +use crate::value::Value; + +/// Column's default constraint. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum ColumnDefaultConstraint { + // A function invocation + // TODO(dennis): we save the function expression here, maybe use a struct in future. + Function(String), + // A value + Value(Value), +} + +impl TryFrom<&[u8]> for ColumnDefaultConstraint { + type Error = error::Error; + + fn try_from(bytes: &[u8]) -> Result { + let json = String::from_utf8_lossy(bytes); + serde_json::from_str(&json).context(DeserializeSnafu { json }) + } +} + +impl TryInto> for ColumnDefaultConstraint { + type Error = error::Error; + + fn try_into(self) -> Result> { + let s = serde_json::to_string(&self).context(SerializeSnafu)?; + Ok(s.into_bytes()) + } +} /// Key used to store column name of the timestamp column in metadata. /// @@ -18,12 +47,15 @@ use crate::error::{self, Error, Result}; const TIMESTAMP_COLUMN_KEY: &str = "greptime:timestamp_column"; /// Key used to store version number of the schema in metadata. const VERSION_KEY: &str = "greptime:version"; +/// Key used to store default constraint in arrow field's metadata. +const ARROW_FIELD_DEFAULT_CONSTRAINT_KEY: &str = "greptime:default_constraint"; #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct ColumnSchema { pub name: String, pub data_type: ConcreteDataType, pub is_nullable: bool, + pub default_constraint: Option, } impl ColumnSchema { @@ -36,8 +68,17 @@ impl ColumnSchema { name: name.into(), data_type, is_nullable, + default_constraint: None, } } + + pub fn with_default_constraint( + mut self, + default_constraint: Option, + ) -> Self { + self.default_constraint = default_constraint; + self + } } /// A common schema, should be immutable. @@ -61,9 +102,20 @@ impl Schema { /// Initial version of the schema. pub const INITIAL_VERSION: u32 = 0; + /// Create a schema from a vector of [ColumnSchema]. + /// # Panics + /// Panics when ColumnSchema's `default_constrait` can't be serialized into json. pub fn new(column_schemas: Vec) -> Schema { // Builder won't fail - SchemaBuilder::from(column_schemas).build().unwrap() + SchemaBuilder::try_from(column_schemas) + .unwrap() + .build() + .unwrap() + } + + pub fn try_new(column_schemas: Vec) -> Result { + // Builder won't fail + Ok(SchemaBuilder::try_from(column_schemas)?.build().unwrap()) } #[inline] @@ -137,22 +189,24 @@ pub struct SchemaBuilder { metadata: Metadata, } -impl From> for SchemaBuilder { - fn from(column_schemas: Vec) -> SchemaBuilder { - SchemaBuilder::from_columns(column_schemas) +impl TryFrom> for SchemaBuilder { + type Error = Error; + + fn try_from(column_schemas: Vec) -> Result { + SchemaBuilder::try_from_columns(column_schemas) } } impl SchemaBuilder { - pub fn from_columns(column_schemas: Vec) -> Self { - let (fields, name_to_index) = collect_fields(&column_schemas); + pub fn try_from_columns(column_schemas: Vec) -> Result { + let (fields, name_to_index) = collect_fields(&column_schemas)?; - Self { + Ok(Self { column_schemas, name_to_index, fields, ..Default::default() - } + }) } /// Set timestamp index. @@ -198,16 +252,16 @@ impl SchemaBuilder { } } -fn collect_fields(column_schemas: &[ColumnSchema]) -> (Vec, HashMap) { +fn collect_fields(column_schemas: &[ColumnSchema]) -> Result<(Vec, HashMap)> { let mut fields = Vec::with_capacity(column_schemas.len()); let mut name_to_index = HashMap::with_capacity(column_schemas.len()); for (index, column_schema) in column_schemas.iter().enumerate() { - let field = Field::from(column_schema); + let field = Field::try_from(column_schema)?; fields.push(field); name_to_index.insert(column_schema.name.clone(), index); } - (fields, name_to_index) + Ok((fields, name_to_index)) } fn validate_timestamp_index(column_schemas: &[ColumnSchema], timestamp_index: usize) -> Result<()> { @@ -236,22 +290,41 @@ impl TryFrom<&Field> for ColumnSchema { fn try_from(field: &Field) -> Result { let data_type = ConcreteDataType::try_from(&field.data_type)?; + let default_constraint = match field.metadata.get(ARROW_FIELD_DEFAULT_CONSTRAINT_KEY) { + Some(json) => Some(serde_json::from_str(json).context(DeserializeSnafu { json })?), + None => None, + }; Ok(ColumnSchema { name: field.name.clone(), data_type, is_nullable: field.is_nullable, + default_constraint, }) } } -impl From<&ColumnSchema> for Field { - fn from(column_schema: &ColumnSchema) -> Field { - Field::new( +impl TryFrom<&ColumnSchema> for Field { + type Error = Error; + + fn try_from(column_schema: &ColumnSchema) -> Result { + let metadata = if let Some(value) = &column_schema.default_constraint { + let mut m = BTreeMap::new(); + m.insert( + ARROW_FIELD_DEFAULT_CONSTRAINT_KEY.to_string(), + serde_json::to_string(&value).context(SerializeSnafu)?, + ); + m + } else { + BTreeMap::default() + }; + + Ok(Field::new( column_schema.name.clone(), column_schema.data_type.as_arrow_type(), column_schema.is_nullable, ) + .with_metadata(metadata)) } } @@ -319,7 +392,7 @@ mod tests { #[test] fn test_column_schema() { let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true); - let field = Field::from(&column_schema); + let field = Field::try_from(&column_schema).unwrap(); assert_eq!("test", field.name); assert_eq!(ArrowDataType::Int32, field.data_type); assert!(field.is_nullable); @@ -328,6 +401,36 @@ mod tests { assert_eq!(column_schema, new_column_schema); } + #[test] + fn test_column_schema_with_default_constraint() { + let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true) + .with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::from(99)))); + let field = Field::try_from(&column_schema).unwrap(); + assert_eq!("test", field.name); + assert_eq!(ArrowDataType::Int32, field.data_type); + assert!(field.is_nullable); + assert_eq!( + "{\"Value\":{\"Int32\":99}}", + field + .metadata + .get(ARROW_FIELD_DEFAULT_CONSTRAINT_KEY) + .unwrap() + ); + + let new_column_schema = ColumnSchema::try_from(&field).unwrap(); + assert_eq!(column_schema, new_column_schema); + } + + #[test] + fn test_column_default_constraint_try_into_from() { + let default_constraint = ColumnDefaultConstraint::Value(Value::from(42i64)); + + let bytes: Vec = default_constraint.clone().try_into().unwrap(); + let from_value = ColumnDefaultConstraint::try_from(&bytes[..]).unwrap(); + + assert_eq!(default_constraint, from_value); + } + #[test] fn test_build_empty_schema() { let schema = SchemaBuilder::default().build().unwrap(); @@ -370,7 +473,8 @@ mod tests { ConcreteDataType::int32_datatype(), false, )]; - let schema = SchemaBuilder::from(column_schemas) + let schema = SchemaBuilder::try_from(column_schemas) + .unwrap() .add_metadata("k1", "v1") .build() .unwrap(); @@ -384,7 +488,8 @@ mod tests { ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true), ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), false), ]; - let schema = SchemaBuilder::from(column_schemas.clone()) + let schema = SchemaBuilder::try_from(column_schemas.clone()) + .unwrap() .timestamp_index(1) .version(123) .build() @@ -405,15 +510,18 @@ mod tests { ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true), ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), false), ]; - assert!(SchemaBuilder::from(column_schemas.clone()) + assert!(SchemaBuilder::try_from(column_schemas.clone()) + .unwrap() .timestamp_index(0) .build() .is_err()); - assert!(SchemaBuilder::from(column_schemas.clone()) + assert!(SchemaBuilder::try_from(column_schemas.clone()) + .unwrap() .timestamp_index(1) .build() .is_err()); - assert!(SchemaBuilder::from(column_schemas) + assert!(SchemaBuilder::try_from(column_schemas) + .unwrap() .timestamp_index(1) .build() .is_err()); diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 470063a6c2..1a06497a61 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -42,6 +42,17 @@ pub enum Error { source: api::error::Error, }, + #[snafu(display( + "Failed to convert column default constraint, column: {}, source: {}", + column_name, + source + ))] + ConvertColumnDefaultConstraint { + column_name: String, + #[snafu(backtrace)] + source: datatypes::error::Error, + }, + #[snafu(display("Invalid SQL, error: {}", err_msg))] InvalidSql { err_msg: String, @@ -66,6 +77,7 @@ impl ErrorExt for Error { Error::RuntimeResource { source, .. } => source.status_code(), Error::StartServer { source, .. } => source.status_code(), Error::ParseSql { source } => source.status_code(), + Error::ConvertColumnDefaultConstraint { source, .. } => source.status_code(), Error::ColumnDataType { .. } => StatusCode::Internal, Error::IllegalFrontendState { .. } => StatusCode::Unexpected, } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index f2f67038a1..9dc98b3ae0 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -21,7 +21,7 @@ use sql::statements::statement::Statement; use sql::statements::{column_def_to_schema, table_idents_to_full_name}; use sql::{dialect::GenericDialect, parser::ParserContext}; -use crate::error::{self, Result}; +use crate::error::{self, ConvertColumnDefaultConstraintSnafu, Result}; use crate::frontend::FrontendOptions; pub(crate) type InstanceRef = Arc; @@ -206,15 +206,25 @@ fn columns_to_expr(column_defs: &[ColumnDef]) -> Result> { }) .collect::>>()?; - Ok(column_schemas + column_schemas .iter() .zip(column_datatypes.into_iter()) - .map(|(schema, datatype)| GrpcColumnDef { - name: schema.name.clone(), - datatype: datatype as i32, - is_nullable: schema.is_nullable, + .map(|(schema, datatype)| { + Ok(GrpcColumnDef { + name: schema.name.clone(), + datatype: datatype as i32, + is_nullable: schema.is_nullable, + default_constraint: match &schema.default_constraint { + None => None, + Some(v) => Some(v.clone().try_into().context( + ConvertColumnDefaultConstraintSnafu { + column_name: &schema.name, + }, + )?), + }, + }) }) - .collect::>()) + .collect() } #[async_trait] @@ -257,6 +267,8 @@ mod tests { use datafusion_common::record_batch::RecordBatch as DfRecordBatch; use datanode::datanode::{DatanodeOptions, ObjectStoreConfig}; use datanode::instance::Instance as DatanodeInstance; + use datatypes::schema::ColumnDefaultConstraint; + use datatypes::value::Value; use servers::grpc::GrpcServer; use tempdir::TempDir; use tonic::transport::{Endpoint, Server}; @@ -276,6 +288,7 @@ mod tests { ts TIMESTAMP, cpu DOUBLE NULL, memory DOUBLE NULL, + disk_util DOUBLE DEFAULT 9.9, TIME INDEX (ts), PRIMARY KEY(ts, host) ) engine=mito with(regions=1);"#; @@ -314,13 +327,13 @@ mod tests { let pretty_print = arrow_print::write(&recordbatches); let pretty_print = pretty_print.lines().collect::>(); let expected = vec![ - "+----------------+---------------------+-----+--------+", - "| host | ts | cpu | memory |", - "+----------------+---------------------+-----+--------+", - "| frontend.host1 | 1970-01-01 00:00:01 | 1.1 | 100 |", - "| frontend.host2 | 1970-01-01 00:00:02 | | |", - "| frontend.host3 | 1970-01-01 00:00:03 | 3.3 | 300 |", - "+----------------+---------------------+-----+--------+", + "+----------------+---------------------+-----+--------+-----------+", + "| host | ts | cpu | memory | disk_util |", + "+----------------+---------------------+-----+--------+-----------+", + "| frontend.host1 | 1970-01-01 00:00:01 | 1.1 | 100 | 9.9 |", + "| frontend.host2 | 1970-01-01 00:00:02 | | | 9.9 |", + "| frontend.host3 | 1970-01-01 00:00:03 | 3.3 | 300 | 9.9 |", + "+----------------+---------------------+-----+--------+-----------+", ]; assert_eq!(pretty_print, expected); } @@ -341,12 +354,12 @@ mod tests { let pretty_print = arrow_print::write(&recordbatches); let pretty_print = pretty_print.lines().collect::>(); let expected = vec![ - "+----------------+---------------------+-----+--------+", - "| host | ts | cpu | memory |", - "+----------------+---------------------+-----+--------+", - "| frontend.host2 | 1970-01-01 00:00:02 | | |", - "| frontend.host3 | 1970-01-01 00:00:03 | 3.3 | 300 |", - "+----------------+---------------------+-----+--------+", + "+----------------+---------------------+-----+--------+-----------+", + "| host | ts | cpu | memory | disk_util |", + "+----------------+---------------------+-----+--------+-----------+", + "| frontend.host2 | 1970-01-01 00:00:02 | | | 9.9 |", + "| frontend.host3 | 1970-01-01 00:00:03 | 3.3 | 300 | 9.9 |", + "+----------------+---------------------+-----+--------+-----------+", ]; assert_eq!(pretty_print, expected); } @@ -394,6 +407,15 @@ mod tests { datatype: Some(10), // float64 ..Default::default() }; + let expected_disk_col = Column { + column_name: "disk_util".to_string(), + values: Some(column::Values { + f64_values: vec![9.9, 9.9, 9.9, 9.9], + ..Default::default() + }), + datatype: Some(10), // float64 + ..Default::default() + }; let expected_ts_col = Column { column_name: "ts".to_string(), values: Some(column::Values { @@ -467,13 +489,14 @@ mod tests { assert_eq!(4, select_result.row_count); let actual_columns = select_result.columns; - assert_eq!(4, actual_columns.len()); + assert_eq!(5, actual_columns.len()); // Respect the order in create table schema let expected_columns = vec![ expected_host_col, expected_cpu_col, expected_mem_col, + expected_disk_col, expected_ts_col, ]; expected_columns @@ -548,21 +571,35 @@ mod tests { name: "host".to_string(), datatype: 12, // string is_nullable: false, + default_constraint: None, }, GrpcColumnDef { name: "cpu".to_string(), datatype: 10, // float64 is_nullable: true, + default_constraint: None, }, GrpcColumnDef { name: "memory".to_string(), datatype: 10, // float64 is_nullable: true, + default_constraint: None, + }, + GrpcColumnDef { + name: "disk_util".to_string(), + datatype: 10, // float64 + is_nullable: true, + default_constraint: Some( + ColumnDefaultConstraint::Value(Value::from(9.9f64)) + .try_into() + .unwrap(), + ), }, GrpcColumnDef { name: "ts".to_string(), datatype: 15, // timestamp is_nullable: true, + default_constraint: None, }, ]; CreateExpr { diff --git a/src/script/src/table.rs b/src/script/src/table.rs index db68e7c7d0..d99af75ffc 100644 --- a/src/script/src/table.rs +++ b/src/script/src/table.rs @@ -214,7 +214,9 @@ fn build_scripts_schema() -> Schema { ), ]; - SchemaBuilder::from(cols) + // Schema is always valid here + SchemaBuilder::try_from(cols) + .unwrap() .timestamp_index(3) .build() .unwrap() diff --git a/src/servers/tests/postgres/mod.rs b/src/servers/tests/postgres/mod.rs index 8d9abddf9b..2970a92959 100644 --- a/src/servers/tests/postgres/mod.rs +++ b/src/servers/tests/postgres/mod.rs @@ -65,14 +65,20 @@ async fn test_shutdown_pg_server() -> Result<()> { for _ in 0..1000 { match create_connection(server_port).await { Ok(connection) => { - let rows = connection + match connection .simple_query("SELECT uint32s FROM numbers LIMIT 1") .await - .unwrap(); - let result_text = unwrap_results(&rows)[0]; - let result: i32 = result_text.parse().unwrap(); - assert_eq!(result, 0); - tokio::time::sleep(Duration::from_millis(10)).await; + { + Ok(rows) => { + let result_text = unwrap_results(&rows)[0]; + let result: i32 = result_text.parse().unwrap(); + assert_eq!(result, 0); + tokio::time::sleep(Duration::from_millis(10)).await; + } + Err(e) => { + return Err(e); + } + } } Err(e) => { return Err(e); @@ -91,7 +97,11 @@ async fn test_shutdown_pg_server() -> Result<()> { let result = handle.await.unwrap(); assert!(result.is_err()); let error = result.unwrap_err().to_string(); - assert!(error.contains("Connection refused") || error.contains("Connection reset by peer")); + assert!( + error.contains("Connection refused") + || error.contains("Connection reset by peer") + || error.contains("close") + ); } Ok(()) diff --git a/src/sql/Cargo.toml b/src/sql/Cargo.toml index ce6084bb0a..5365b5d179 100644 --- a/src/sql/Cargo.toml +++ b/src/sql/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" [dependencies] common-error = { path = "../common/error" } +common-time = { path = "../common/time" } datatypes = { path = "../datatypes" } snafu = { version = "0.7", features = ["backtraces"] } sqlparser = "0.15.0" diff --git a/src/sql/src/ast.rs b/src/sql/src/ast.rs index c97ebf657f..bd28b1fcb4 100644 --- a/src/sql/src/ast.rs +++ b/src/sql/src/ast.rs @@ -1,4 +1,4 @@ pub use sqlparser::ast::{ - ColumnDef, ColumnOption, ColumnOptionDef, DataType, Expr, Ident, ObjectName, SqlOption, - TableConstraint, Value, + ColumnDef, ColumnOption, ColumnOptionDef, DataType, Expr, Function, FunctionArg, + FunctionArgExpr, Ident, ObjectName, SqlOption, TableConstraint, Value, }; diff --git a/src/sql/src/error.rs b/src/sql/src/error.rs index ea46e8c6a7..c922e02127 100644 --- a/src/sql/src/error.rs +++ b/src/sql/src/error.rs @@ -1,9 +1,12 @@ use std::any::Any; use common_error::prelude::*; +use datatypes::prelude::ConcreteDataType; use sqlparser::parser::ParserError; use sqlparser::tokenizer::TokenizerError; +use crate::ast::Expr; + pub type Result = std::result::Result; /// SQL parser errors. @@ -29,6 +32,17 @@ pub enum Error { source: ParserError, }, + #[snafu(display( + "Unsupported expr in default constraint: {} for column: {}", + expr, + column_name + ))] + UnsupportedDefaultValue { + column_name: String, + expr: Expr, + backtrace: Backtrace, + }, + // Syntax error from sql parser. #[snafu(display("Syntax error, sql: {}, source: {}", sql, source))] Syntax { sql: String, source: ParserError }, @@ -50,6 +64,21 @@ pub enum Error { t: crate::ast::DataType, backtrace: Backtrace, }, + + #[snafu(display("Failed to parse value: {}, {}", msg, backtrace))] + ParseSqlValue { msg: String, backtrace: Backtrace }, + + #[snafu(display( + "Column {} expect type: {:?}, actual: {:?}", + column_name, + expect, + actual + ))] + ColumnTypeMismatch { + column_name: String, + expect: ConcreteDataType, + actual: ConcreteDataType, + }, } impl ErrorExt for Error { @@ -57,13 +86,16 @@ impl ErrorExt for Error { use Error::*; match self { - Unsupported { .. } => StatusCode::Unsupported, + UnsupportedDefaultValue { .. } | Unsupported { .. } => StatusCode::Unsupported, Unexpected { .. } | Syntax { .. } | InvalidTimeIndex { .. } | Tokenizer { .. } | InvalidSql { .. } + | ParseSqlValue { .. } | SqlTypeNotSupported { .. } => StatusCode::InvalidSyntax, + + ColumnTypeMismatch { .. } => StatusCode::InvalidArguments, } } diff --git a/src/sql/src/statements.rs b/src/sql/src/statements.rs index 72c06cc684..7d699a2445 100644 --- a/src/sql/src/statements.rs +++ b/src/sql/src/statements.rs @@ -6,15 +6,24 @@ pub mod show_database; pub mod show_kind; pub mod statement; -use datatypes::prelude::ConcreteDataType; -use datatypes::schema::ColumnSchema; -use datatypes::types::DateTimeType; +use std::str::FromStr; -use crate::ast::{ColumnDef, ColumnOption, DataType as SqlDataType, ObjectName}; -use crate::error::{self, Result}; +use datatypes::prelude::ConcreteDataType; +use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema}; +use datatypes::types::DateTimeType; +use datatypes::value::Value; +use snafu::ensure; + +use crate::ast::{ + ColumnDef, ColumnOption, ColumnOptionDef, DataType as SqlDataType, Expr, ObjectName, + Value as SqlValue, +}; +use crate::error::{ + self, ColumnTypeMismatchSnafu, ParseSqlValueSnafu, Result, UnsupportedDefaultValueSnafu, +}; /// Converts maybe fully-qualified table name (`..` or `
` when -/// catalog and schema are default) to tuple. +/// catalog and schema are default) to tuple. pub fn table_idents_to_full_name( obj_name: &ObjectName, ) -> Result<(Option, Option, String)> { @@ -35,15 +44,175 @@ pub fn table_idents_to_full_name( } } +fn parse_string_to_value( + column_name: &str, + s: String, + data_type: &ConcreteDataType, +) -> Result { + ensure!( + data_type.is_string(), + ColumnTypeMismatchSnafu { + column_name, + expect: data_type.clone(), + actual: ConcreteDataType::string_datatype(), + } + ); + + match data_type { + ConcreteDataType::String(_) => Ok(Value::String(s.into())), + ConcreteDataType::Date(_) => { + if let Ok(date) = common_time::date::Date::from_str(&s) { + Ok(Value::Date(date)) + } else { + ParseSqlValueSnafu { + msg: format!("Failed to parse {} to Date value", s), + } + .fail() + } + } + ConcreteDataType::DateTime(_) => { + if let Ok(datetime) = common_time::datetime::DateTime::from_str(&s) { + Ok(Value::DateTime(datetime)) + } else { + ParseSqlValueSnafu { + msg: format!("Failed to parse {} to DateTime value", s), + } + .fail() + } + } + _ => { + unreachable!() + } + } +} + +macro_rules! parse_number_to_value { + ($data_type: expr, $n: ident, $(($Type: ident, $PrimitiveType: ident)), +) => { + match $data_type { + $( + ConcreteDataType::$Type(_) => { + let n = parse_sql_number::<$PrimitiveType>($n)?; + Ok(Value::from(n)) + }, + )+ + _ => ParseSqlValueSnafu { + msg: format!("Fail to parse number {}, invalid column type: {:?}", + $n, $data_type + )}.fail(), + } + } +} + +/// Convert a sql value into datatype's value +pub fn sql_number_to_value(data_type: &ConcreteDataType, n: &str) -> Result { + parse_number_to_value!( + data_type, + n, + (UInt8, u8), + (UInt16, u16), + (UInt32, u32), + (UInt64, u64), + (Int8, i8), + (Int16, i16), + (Int32, i32), + (Int64, i64), + (Float64, f64), + (Float32, f32), + (Timestamp, i64) + ) + // TODO(hl): also Date/DateTime +} + +fn parse_sql_number(n: &str) -> Result +where + ::Err: std::fmt::Debug, +{ + match n.parse::() { + Ok(n) => Ok(n), + Err(e) => ParseSqlValueSnafu { + msg: format!("Fail to parse number {}, {:?}", n, e), + } + .fail(), + } +} + +pub fn sql_value_to_value( + column_name: &str, + data_type: &ConcreteDataType, + sql_val: &SqlValue, +) -> Result { + Ok(match sql_val { + SqlValue::Number(n, _) => sql_number_to_value(data_type, n)?, + SqlValue::Null => Value::Null, + SqlValue::Boolean(b) => { + ensure!( + data_type.is_boolean(), + ColumnTypeMismatchSnafu { + column_name, + expect: data_type.clone(), + actual: ConcreteDataType::boolean_datatype(), + } + ); + + (*b).into() + } + SqlValue::DoubleQuotedString(s) | SqlValue::SingleQuotedString(s) => { + parse_string_to_value(column_name, s.to_owned(), data_type)? + } + _ => todo!("Other sql value"), + }) +} + +fn parse_column_default_constraint( + column_name: &str, + data_type: &ConcreteDataType, + opts: &[ColumnOptionDef], +) -> Result> { + if let Some(opt) = opts + .iter() + .find(|o| matches!(o.option, ColumnOption::Default(_))) + { + let default_constraint = match &opt.option { + ColumnOption::Default(Expr::Value(v)) => { + ColumnDefaultConstraint::Value(sql_value_to_value(column_name, data_type, v)?) + } + ColumnOption::Default(Expr::Function(func)) => { + // Always use lowercase for function expression + ColumnDefaultConstraint::Function(format!("{}", func).to_lowercase()) + } + ColumnOption::Default(expr) => { + return UnsupportedDefaultValueSnafu { + column_name, + expr: expr.clone(), + } + .fail(); + } + _ => unreachable!(), + }; + + Ok(Some(default_constraint)) + } else { + Ok(None) + } +} + +/// Create a `ColumnSchema` from `ColumnDef`. pub fn column_def_to_schema(column_def: &ColumnDef) -> Result { let is_nullable = column_def .options .iter() .any(|o| matches!(o.option, ColumnOption::Null)); + + let name = column_def.name.value.clone(); + let data_type = sql_data_type_to_concrete_data_type(&column_def.data_type)?; + let default_constraint = + parse_column_default_constraint(&name, &data_type, &column_def.options)?; + Ok(ColumnSchema { - name: column_def.name.value.clone(), - data_type: sql_data_type_to_concrete_data_type(&column_def.data_type)?, + name, + data_type, is_nullable, + default_constraint, }) } @@ -86,6 +255,8 @@ fn sql_data_type_to_concrete_data_type(data_type: &SqlDataType) -> Result) -> .collect(); if let Some(index) = timestamp_index { - SchemaBuilder::from(column_schemas) + SchemaBuilder::try_from(column_schemas) + .unwrap() .timestamp_index(index) .build() .unwrap() diff --git a/src/storage/src/proto/write_batch.rs b/src/storage/src/proto/write_batch.rs index 6942904be0..b4810a18b8 100644 --- a/src/storage/src/proto/write_batch.rs +++ b/src/storage/src/proto/write_batch.rs @@ -80,12 +80,13 @@ impl TryFrom for schema::SchemaRef { let schema: schema::SchemaRef = match schema.timestamp_index { Some(index) => Arc::new( - schema::SchemaBuilder::from(column_schemas) + schema::SchemaBuilder::try_from(column_schemas) + .context(ConvertSchemaSnafu)? .timestamp_index(index.value as usize) .build() .context(ConvertSchemaSnafu)?, ), - None => Arc::new(schema::Schema::new(column_schemas)), + None => Arc::new(schema::Schema::try_new(column_schemas).context(ConvertSchemaSnafu)?), }; Ok(schema) diff --git a/src/storage/src/schema.rs b/src/storage/src/schema.rs index 29c0a16f9e..4a3a7dff3a 100644 --- a/src/storage/src/schema.rs +++ b/src/storage/src/schema.rs @@ -68,6 +68,12 @@ pub enum Error { source: datatypes::error::Error, }, + #[snafu(display("Failed to convert schema, source: {}", source))] + ConvertSchema { + #[snafu(backtrace)] + source: datatypes::error::Error, + }, + #[snafu(display("Invalid projection, {}", msg))] InvalidProjection { msg: String, backtrace: Backtrace }, } @@ -255,7 +261,8 @@ impl StoreSchema { row_key_end: usize, user_column_end: usize, ) -> Result { - let schema = SchemaBuilder::from(column_schemas) + let schema = SchemaBuilder::try_from(column_schemas) + .context(ConvertSchemaSnafu)? .timestamp_index(timestamp_key_index) .version(version) .add_metadata(ROW_KEY_END_KEY, row_key_end.to_string()) @@ -575,7 +582,9 @@ impl ProjectedSchema { .map(|col_idx| ColumnSchema::from(®ion_schema.column_metadata(*col_idx).desc)) .collect(); - let mut builder = SchemaBuilder::from(column_schemas).version(region_schema.version()); + let mut builder = SchemaBuilder::try_from(column_schemas) + .context(ConvertSchemaSnafu)? + .version(region_schema.version()); if let Some(timestamp_index) = timestamp_index { builder = builder.timestamp_index(timestamp_index); } @@ -685,7 +694,8 @@ fn build_user_schema(columns: &ColumnsMetadata, version: u32) -> Result .map(|col| ColumnSchema::from(&col.desc)) .collect(); - SchemaBuilder::from(column_schemas) + SchemaBuilder::try_from(column_schemas) + .context(ConvertSchemaSnafu)? .timestamp_index(columns.timestamp_key_index()) .version(version) .build() diff --git a/src/storage/src/test_util/schema_util.rs b/src/storage/src/test_util/schema_util.rs index df68559dde..d3161971e7 100644 --- a/src/storage/src/test_util/schema_util.rs +++ b/src/storage/src/test_util/schema_util.rs @@ -23,7 +23,9 @@ pub fn new_schema_with_version( }) .collect(); - let mut builder = SchemaBuilder::from(column_schemas).version(version); + let mut builder = SchemaBuilder::try_from(column_schemas) + .unwrap() + .version(version); if let Some(index) = timestamp_index { builder = builder.timestamp_index(index); } diff --git a/src/storage/src/write_batch.rs b/src/storage/src/write_batch.rs index 4ab2bc0299..6994603dd7 100644 --- a/src/storage/src/write_batch.rs +++ b/src/storage/src/write_batch.rs @@ -9,7 +9,7 @@ use common_error::prelude::*; use common_time::{RangeMillis, TimestampMillis}; use datatypes::vectors::TimestampVector; use datatypes::{ - arrow::error::ArrowError, data_type::ConcreteDataType, prelude::ScalarVector, + arrow::error::ArrowError, data_type::ConcreteDataType, prelude::ScalarVector, prelude::Value, schema::SchemaRef, vectors::VectorRef, }; use prost::{DecodeError, EncodeError}; @@ -202,11 +202,22 @@ impl WriteRequest for WriteBatch { let column = put_data .column_by_name(ts_col_name) .unwrap_or_else(|| panic!("Cannot find column by name: {}", ts_col_name)); - let ts_vector = column.as_any().downcast_ref::().unwrap(); // not expected to fail - for ts in ts_vector.iter_data().flatten() { + if column.is_const() { + let ts = match column.get(0) { + Value::Timestamp(ts) => ts, + _ => unreachable!(), + }; let aligned = align_timestamp(ts.value(), durations_millis) .context(TimestampOverflowSnafu { ts: ts.value() })?; + aligned_timestamps.insert(aligned); + } else { + let ts_vector = column.as_any().downcast_ref::().unwrap(); // not expected to fail + for ts in ts_vector.iter_data().flatten() { + let aligned = align_timestamp(ts.value(), durations_millis) + .context(TimestampOverflowSnafu { ts: ts.value() })?; + aligned_timestamps.insert(aligned); + } } } } @@ -260,7 +271,7 @@ pub enum Mutation { Put(PutData), } -#[derive(Default)] +#[derive(Default, Debug)] pub struct PutData { columns: HashMap, } @@ -806,7 +817,9 @@ mod tests { use std::sync::Arc; use datatypes::type_id::LogicalTypeId; - use datatypes::vectors::{BooleanVector, Int32Vector, Int64Vector, UInt64Vector}; + use datatypes::vectors::{ + BooleanVector, ConstantVector, Int32Vector, Int64Vector, UInt64Vector, + }; use super::*; use crate::codec::{Decoder, Encoder}; @@ -1033,6 +1046,36 @@ mod tests { ) } + #[test] + pub fn test_write_batch_time_range_const_vector() { + let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3, 4, 5, 6])); + let tsv = Arc::new(ConstantVector::new( + Arc::new(TimestampVector::from_vec(vec![20])), + 6, + )); + let boolv = Arc::new(BooleanVector::from(vec![ + true, false, true, false, false, false, + ])); + + let mut put_data = PutData::new(); + put_data.add_key_column("k1", intv.clone()).unwrap(); + put_data.add_version_column(intv).unwrap(); + put_data.add_value_column("v1", boolv).unwrap(); + put_data.add_key_column("ts", tsv).unwrap(); + + let mut batch = new_test_batch(); + batch.put(put_data).unwrap(); + + let duration_millis = 20i64; + let ranges = batch + .time_ranges(Duration::from_millis(duration_millis as u64)) + .unwrap(); + assert_eq!( + [20].map(|v| RangeMillis::new(v, v + duration_millis).unwrap()), + ranges.as_slice() + ) + } + fn gen_new_batch_and_extras() -> (WriteBatch, Vec) { let mut batch = new_test_batch(); for i in 0..10 { diff --git a/src/store-api/src/storage.rs b/src/store-api/src/storage.rs index 392959bcd0..acc50deb2c 100644 --- a/src/store-api/src/storage.rs +++ b/src/store-api/src/storage.rs @@ -12,7 +12,9 @@ mod snapshot; mod types; pub use datatypes::data_type::ConcreteDataType; -pub use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder, SchemaRef}; +pub use datatypes::schema::{ + ColumnDefaultConstraint, ColumnSchema, Schema, SchemaBuilder, SchemaRef, +}; pub use self::chunk::{Chunk, ChunkReader}; pub use self::descriptors::*; diff --git a/src/store-api/src/storage/descriptors.rs b/src/store-api/src/storage/descriptors.rs index d73cd3ca12..a5808f3ef9 100644 --- a/src/store-api/src/storage/descriptors.rs +++ b/src/store-api/src/storage/descriptors.rs @@ -1,8 +1,7 @@ -use datatypes::value::Value; use derive_builder::Builder; use serde::{Deserialize, Serialize}; -use crate::storage::{consts, ColumnSchema, ConcreteDataType}; +use crate::storage::{consts, ColumnDefaultConstraint, ColumnSchema, ConcreteDataType}; /// Id of column, unique in each region. pub type ColumnId = u32; @@ -23,10 +22,10 @@ pub struct ColumnDescriptor { /// Is column nullable, default is true. #[builder(default = "true")] pub is_nullable: bool, - /// Default value of column, default is None, which means no default value - /// for this column, and user must provide value for a not-null column. + /// Default constraint of column, default is None, which means no default constraint + /// for this column, and user must provide a value for a not-null column. #[builder(default)] - pub default_value: Option, + pub default_constraint: Option, #[builder(default, setter(into))] pub comment: String, } @@ -45,6 +44,7 @@ impl ColumnDescriptorBuilder { impl From<&ColumnDescriptor> for ColumnSchema { fn from(desc: &ColumnDescriptor) -> ColumnSchema { ColumnSchema::new(&desc.name, desc.data_type.clone(), desc.is_nullable) + .with_default_constraint(desc.default_constraint.clone()) } } @@ -116,6 +116,8 @@ impl ColumnFamilyDescriptorBuilder { #[cfg(test)] mod tests { + use datatypes::value::Value; + use super::*; #[inline] @@ -130,7 +132,7 @@ mod tests { assert_eq!("test", desc.name); assert_eq!(ConcreteDataType::int32_datatype(), desc.data_type); assert!(desc.is_nullable); - assert!(desc.default_value.is_none()); + assert!(desc.default_constraint.is_none()); assert!(desc.comment.is_empty()); let desc = new_column_desc_builder() @@ -140,16 +142,22 @@ mod tests { assert!(!desc.is_nullable); let desc = new_column_desc_builder() - .default_value(Some(Value::Null)) + .default_constraint(Some(ColumnDefaultConstraint::Value(Value::Null))) .build() .unwrap(); - assert_eq!(Value::Null, desc.default_value.unwrap()); + assert_eq!( + ColumnDefaultConstraint::Value(Value::Null), + desc.default_constraint.unwrap() + ); let desc = new_column_desc_builder() - .default_value(Some(Value::Int32(123))) + .default_constraint(Some(ColumnDefaultConstraint::Value(Value::Int32(123)))) .build() .unwrap(); - assert_eq!(Value::Int32(123), desc.default_value.unwrap()); + assert_eq!( + ColumnDefaultConstraint::Value(Value::Int32(123)), + desc.default_constraint.unwrap() + ); let desc = new_column_desc_builder() .comment("A test column") diff --git a/src/store-api/src/storage/requests.rs b/src/store-api/src/storage/requests.rs index 1309cc4baa..533c19858b 100644 --- a/src/store-api/src/storage/requests.rs +++ b/src/store-api/src/storage/requests.rs @@ -25,7 +25,7 @@ pub trait WriteRequest: Send { } /// Put multiple rows. -pub trait PutOperation: Send { +pub trait PutOperation: Send + std::fmt::Debug { type Error: ErrorExt + Send + Sync; fn add_key_column(&mut self, name: &str, vector: VectorRef) -> Result<(), Self::Error>; diff --git a/src/table-engine/src/engine.rs b/src/table-engine/src/engine.rs index 2da9ad5721..cd985920c8 100644 --- a/src/table-engine/src/engine.rs +++ b/src/table-engine/src/engine.rs @@ -143,6 +143,7 @@ pub(crate) fn build_row_key_desc( ts_column_schema.name.clone(), ts_column_schema.data_type.clone(), ) + .default_constraint(ts_column_schema.default_constraint.clone()) .is_nullable(ts_column_schema.is_nullable) .build() .context(BuildColumnDescriptorSnafu { @@ -168,6 +169,7 @@ pub(crate) fn build_row_key_desc( column_schema.name.clone(), column_schema.data_type.clone(), ) + .default_constraint(column_schema.default_constraint.clone()) .is_nullable(column_schema.is_nullable) .build() .context(BuildColumnDescriptorSnafu { @@ -210,6 +212,7 @@ pub(crate) fn build_column_family( column_schema.name.clone(), column_schema.data_type.clone(), ) + .default_constraint(column_schema.default_constraint.clone()) .is_nullable(column_schema.is_nullable) .build() .context(BuildColumnDescriptorSnafu { @@ -421,15 +424,141 @@ mod tests { use datafusion_common::field_util::SchemaExt; use datatypes::prelude::{ConcreteDataType, ScalarVector}; use datatypes::schema::ColumnSchema; + use datatypes::schema::{ColumnDefaultConstraint, SchemaBuilder}; + use datatypes::value::Value; use datatypes::vectors::*; + use log_store::fs::noop::NoopLogStore; + use storage::config::EngineConfig as StorageEngineConfig; + use storage::EngineImpl; use store_api::manifest::Manifest; use store_api::storage::ReadContext; use table::requests::{AlterKind, InsertRequest}; + use tempdir::TempDir; use super::*; use crate::table::test_util; use crate::table::test_util::{MockRegion, TABLE_NAME}; + async fn setup_table_with_column_default_constraint() -> (TempDir, String, TableRef) { + let table_name = "test_default_constraint"; + let column_schemas = vec![ + ColumnSchema::new("name", ConcreteDataType::string_datatype(), false), + ColumnSchema::new("n", ConcreteDataType::int32_datatype(), true) + .with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::from(42i32)))), + ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_datatype(common_time::timestamp::TimeUnit::Millisecond), + true, + ), + ]; + + let schema = Arc::new( + SchemaBuilder::try_from(column_schemas) + .unwrap() + .timestamp_index(2) + .build() + .expect("ts must be timestamp column"), + ); + + let (dir, object_store) = + test_util::new_test_object_store("test_insert_with_column_default_constraint").await; + + let table_engine = MitoEngine::new( + EngineConfig::default(), + EngineImpl::new( + StorageEngineConfig::default(), + Arc::new(NoopLogStore::default()), + object_store.clone(), + ), + object_store, + ); + + let table = table_engine + .create_table( + &EngineContext::default(), + CreateTableRequest { + id: 1, + catalog_name: None, + schema_name: None, + table_name: table_name.to_string(), + desc: Some("a test table".to_string()), + schema: schema.clone(), + create_if_not_exists: true, + primary_key_indices: Vec::default(), + table_options: HashMap::new(), + }, + ) + .await + .unwrap(); + + (dir, table_name.to_string(), table) + } + + #[tokio::test] + async fn test_column_default_constraint() { + let (_dir, table_name, table) = setup_table_with_column_default_constraint().await; + + let mut columns_values: HashMap = HashMap::with_capacity(4); + let names = StringVector::from(vec!["first", "second"]); + let tss = TimestampVector::from_vec(vec![1, 2]); + + columns_values.insert("name".to_string(), Arc::new(names.clone())); + columns_values.insert("ts".to_string(), Arc::new(tss.clone())); + + let insert_req = InsertRequest { + table_name: table_name.to_string(), + columns_values, + }; + assert_eq!(2, table.insert(insert_req).await.unwrap()); + + let stream = table.scan(&None, &[], None).await.unwrap(); + let batches = util::collect(stream).await.unwrap(); + assert_eq!(1, batches.len()); + + let record = &batches[0].df_recordbatch; + assert_eq!(record.num_columns(), 3); + let columns = record.columns(); + assert_eq!(3, columns.len()); + assert_eq!(names.to_arrow_array(), columns[0]); + assert_eq!( + Int32Vector::from_vec(vec![42, 42]).to_arrow_array(), + columns[1] + ); + assert_eq!(tss.to_arrow_array(), columns[2]); + } + + #[tokio::test] + async fn test_insert_with_column_default_constraint() { + let (_dir, table_name, table) = setup_table_with_column_default_constraint().await; + + let mut columns_values: HashMap = HashMap::with_capacity(4); + let names = StringVector::from(vec!["first", "second"]); + let nums = Int32Vector::from(vec![None, Some(66)]); + let tss = TimestampVector::from_vec(vec![1, 2]); + + columns_values.insert("name".to_string(), Arc::new(names.clone())); + columns_values.insert("n".to_string(), Arc::new(nums.clone())); + columns_values.insert("ts".to_string(), Arc::new(tss.clone())); + + let insert_req = InsertRequest { + table_name: table_name.to_string(), + columns_values, + }; + assert_eq!(2, table.insert(insert_req).await.unwrap()); + + let stream = table.scan(&None, &[], None).await.unwrap(); + let batches = util::collect(stream).await.unwrap(); + assert_eq!(1, batches.len()); + + let record = &batches[0].df_recordbatch; + assert_eq!(record.num_columns(), 3); + let columns = record.columns(); + assert_eq!(3, columns.len()); + assert_eq!(names.to_arrow_array(), columns[0]); + assert_eq!(nums.to_arrow_array(), columns[1]); + assert_eq!(tss.to_arrow_array(), columns[2]); + } + #[test] fn test_region_name() { assert_eq!("1_0000000000", region_name(1, 0)); diff --git a/src/table-engine/src/error.rs b/src/table-engine/src/error.rs index 59affca5b1..5b7100c8b0 100644 --- a/src/table-engine/src/error.rs +++ b/src/table-engine/src/error.rs @@ -165,6 +165,9 @@ pub enum Error { backtrace: Backtrace, column_qualified_name: String, }, + + #[snafu(display("Unsupported column default constraint: {}", expr))] + UnsupportedDefaultConstraint { expr: String, backtrace: Backtrace }, } impl From for table::error::Error { @@ -196,6 +199,7 @@ impl ErrorExt for Error { | ColumnExists { .. } | ProjectedColumnNotFound { .. } | MissingTimestampIndex { .. } + | UnsupportedDefaultConstraint { .. } | TableNotFound { .. } => StatusCode::InvalidArguments, TableInfoNotFound { .. } => StatusCode::Unexpected, diff --git a/src/table-engine/src/table.rs b/src/table-engine/src/table.rs index d6190d2628..2074ba1096 100644 --- a/src/table-engine/src/table.rs +++ b/src/table-engine/src/table.rs @@ -11,7 +11,12 @@ use common_query::logical_plan::Expr; use common_recordbatch::error::{Error as RecordBatchError, Result as RecordBatchResult}; use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream}; use common_telemetry::logging; -use datatypes::schema::{ColumnSchema, SchemaBuilder}; +use common_time::util; +use common_time::Timestamp; +use datatypes::data_type::DataType; +use datatypes::prelude::ScalarVector; +use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, SchemaBuilder}; +use datatypes::vectors::{ConstantVector, TimestampVector, VectorRef}; use futures::task::{Context, Poll}; use futures::Stream; use object_store::ObjectStore; @@ -34,8 +39,8 @@ use tokio::sync::Mutex; use crate::engine::{build_column_family, build_row_key_desc, INIT_COLUMN_ID}; use crate::error::{ - self, ProjectedColumnNotFoundSnafu, Result, ScanTableManifestSnafu, TableInfoNotFoundSnafu, - UpdateTableManifestSnafu, + self, ProjectedColumnNotFoundSnafu, Result, ScanTableManifestSnafu, SchemaBuildSnafu, + TableInfoNotFoundSnafu, UnsupportedDefaultConstraintSnafu, UpdateTableManifestSnafu, }; use crate::manifest::action::*; use crate::manifest::TableManifest; @@ -76,30 +81,43 @@ impl Table for MitoTable { let mut columns_values = request.columns_values; let table_info = self.table_info(); + let schema = self.schema(); let key_columns = table_info.meta.row_key_column_names(); let value_columns = table_info.meta.value_column_names(); + // columns_values is not empty, it's safe to unwrap + let rows_num = columns_values.values().next().unwrap().len(); //Add row key and columns for name in key_columns { + let vector = columns_values + .remove(name) + .or_else(|| { + Self::try_get_column_default_constraint_vector(&schema, name, rows_num).ok()? + }) + .context(MissingColumnSnafu { name }) + .map_err(TableError::from)?; + put_op - .add_key_column( - name, - columns_values - .get(name) - .context(MissingColumnSnafu { name })? - .clone(), - ) + .add_key_column(name, vector) .map_err(TableError::new)?; } - // Add vaue columns - let mut rows_num = 0; for name in value_columns { - if let Some(v) = columns_values.remove(name) { - rows_num = v.len(); + let vector = columns_values.remove(name).or_else(|| { + Self::try_get_column_default_constraint_vector(&schema, name, rows_num).ok()? + }); + + if let Some(v) = vector { put_op.add_value_column(name, v).map_err(TableError::new)?; } } + + logging::debug!( + "Insert into table {} with put_op: {:?}", + table_info.name, + put_op + ); + write_request.put(put_op).map_err(TableError::new)?; let _resp = self @@ -272,7 +290,11 @@ fn build_table_schema_with_new_column( // Right now we are not support adding the column // before or after some column, so just clone a new schema like this. // TODO(LFC): support adding column before or after some column - let mut builder = SchemaBuilder::from_columns(columns).version(table_schema.version() + 1); + let mut builder = SchemaBuilder::try_from_columns(columns) + .context(SchemaBuildSnafu { + msg: "Failed to convert column schemas into table schema", + })? + .version(table_schema.version() + 1); if let Some(index) = table_schema.timestamp_index() { builder = builder.timestamp_index(index); @@ -398,6 +420,50 @@ impl MitoTable { Ok(MitoTable::new(table_info, region, manifest)) } + fn try_get_column_default_constraint_vector( + schema: &SchemaRef, + name: &str, + rows_num: usize, + ) -> TableResult> { + // TODO(dennis): when we support altering schema, we should check the schemas difference between table and region + let column_schema = schema + .column_schema_by_name(name) + .expect("column schema not found"); + if let Some(v) = &column_schema.default_constraint { + assert!(rows_num > 0); + + match v { + ColumnDefaultConstraint::Value(v) => { + let mut mutable_vector = column_schema.data_type.create_mutable_vector(1); + mutable_vector + .push_value_ref(v.as_value_ref()) + .map_err(TableError::new)?; + let vector = + Arc::new(ConstantVector::new(mutable_vector.to_vector(), rows_num)); + Ok(Some(vector)) + } + ColumnDefaultConstraint::Function(expr) => { + match &expr[..] { + // TODO(dennis): we only supports current_timestamp right now, + // it's better to use a expression framework in future. + "current_timestamp()" => { + let vector = + Arc::new(TimestampVector::from_slice(&[Timestamp::from_millis( + util::current_time_millis(), + )])); + Ok(Some(Arc::new(ConstantVector::new(vector, rows_num)))) + } + _ => UnsupportedDefaultConstraintSnafu { expr } + .fail() + .map_err(TableError::new), + } + } + } + } else { + Ok(None) + } + } + pub async fn open( table_name: &str, region: R, @@ -487,6 +553,7 @@ mod tests { use datatypes::prelude::ConcreteDataType; use super::*; + use crate::table::test_util; #[test] fn test_table_manifest_dir() { diff --git a/src/table-engine/src/table/test_util.rs b/src/table-engine/src/table/test_util.rs index b49f077720..82d8cb724f 100644 --- a/src/table-engine/src/table/test_util.rs +++ b/src/table-engine/src/table/test_util.rs @@ -36,7 +36,8 @@ pub fn schema_for_test() -> Schema { ), ]; - SchemaBuilder::from(column_schemas) + SchemaBuilder::try_from(column_schemas) + .unwrap() .timestamp_index(3) .build() .expect("ts must be timestamp column")