diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index a6e15470e5..01c5cd1e39 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -192,7 +192,7 @@ pub enum Error { source: api::error::Error, }, - #[snafu(display("Column default constraint error, source: {}", source))] + #[snafu(display("Invalid column default constraint, source: {}", source))] ColumnDefaultConstraint { #[snafu(backtrace)] source: datatypes::error::Error, diff --git a/src/datanode/src/server/grpc/ddl.rs b/src/datanode/src/server/grpc/ddl.rs index 01868235d7..06cc6ce20a 100644 --- a/src/datanode/src/server/grpc/ddl.rs +++ b/src/datanode/src/server/grpc/ddl.rs @@ -147,17 +147,19 @@ fn create_table_schema(expr: &CreateExpr) -> Result { fn create_column_schema(column_def: &ColumnDef) -> Result { let data_type = ColumnDataTypeWrapper::try_new(column_def.datatype).context(error::ColumnDataTypeSnafu)?; - Ok(ColumnSchema { - name: column_def.name.clone(), - data_type: data_type.into(), - is_nullable: column_def.is_nullable, - default_constraint: match &column_def.default_constraint { - None => None, - Some(v) => Some( - ColumnDefaultConstraint::try_from(&v[..]).context(ColumnDefaultConstraintSnafu)?, - ), - }, - }) + let default_constraint = match &column_def.default_constraint { + None => None, + Some(v) => { + Some(ColumnDefaultConstraint::try_from(&v[..]).context(ColumnDefaultConstraintSnafu)?) + } + }; + ColumnSchema::new( + column_def.name.clone(), + data_type.into(), + column_def.is_nullable, + ) + .with_default_constraint(default_constraint) + .context(ColumnDefaultConstraintSnafu) } #[cfg(test)] @@ -237,7 +239,7 @@ mod tests { let column_schema = create_column_schema(&column_def).unwrap(); assert_eq!(column_schema.name, "a"); assert_eq!(column_schema.data_type, ConcreteDataType::string_datatype()); - assert!(column_schema.is_nullable); + assert!(column_schema.is_nullable()); let default_constraint = ColumnDefaultConstraint::Value(Value::from("defaut value")); let column_def = ColumnDef { @@ -249,10 +251,10 @@ mod tests { let column_schema = create_column_schema(&column_def).unwrap(); assert_eq!(column_schema.name, "a"); assert_eq!(column_schema.data_type, ConcreteDataType::string_datatype()); - assert!(column_schema.is_nullable); + assert!(column_schema.is_nullable()); assert_eq!( default_constraint, - column_schema.default_constraint.unwrap() + *column_schema.default_constraint().unwrap() ); } @@ -298,30 +300,10 @@ mod tests { fn expected_table_schema() -> SchemaRef { let column_schemas = vec![ - ColumnSchema { - name: "host".to_string(), - data_type: ConcreteDataType::string_datatype(), - is_nullable: false, - default_constraint: None, - }, - ColumnSchema { - name: "ts".to_string(), - data_type: ConcreteDataType::timestamp_millis_datatype(), - is_nullable: false, - default_constraint: None, - }, - ColumnSchema { - name: "cpu".to_string(), - data_type: ConcreteDataType::float32_datatype(), - is_nullable: true, - default_constraint: None, - }, - ColumnSchema { - name: "memory".to_string(), - data_type: ConcreteDataType::float64_datatype(), - is_nullable: true, - default_constraint: None, - }, + ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), + ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), false), + ColumnSchema::new("cpu", ConcreteDataType::float32_datatype(), true), + ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true), ]; Arc::new( SchemaBuilder::try_from(column_schemas) diff --git a/src/datanode/src/server/grpc/insert.rs b/src/datanode/src/server/grpc/insert.rs index 9f616da7c0..77773cb320 100644 --- a/src/datanode/src/server/grpc/insert.rs +++ b/src/datanode/src/server/grpc/insert.rs @@ -119,8 +119,7 @@ pub fn build_create_table_request( } in columns { if !new_columns.contains(column_name) { - let mut column_schema = build_column_schema(column_name, *datatype, true)?; - + let mut is_nullable = true; match *semantic_type { TAG_SEMANTIC_TYPE => primary_key_indices.push(column_schemas.len()), TIMESTAMP_SEMANTIC_TYPE => { @@ -133,11 +132,12 @@ pub fn build_create_table_request( ); timestamp_index = column_schemas.len(); // Timestamp column must not be null. - column_schema.is_nullable = false; + is_nullable = false; } _ => {} } + let column_schema = build_column_schema(column_name, *datatype, is_nullable)?; column_schemas.push(column_schema); new_columns.insert(column_name.to_string()); } diff --git a/src/datanode/src/sql/alter.rs b/src/datanode/src/sql/alter.rs index 141c3a5c7d..5ff4f87516 100644 --- a/src/datanode/src/sql/alter.rs +++ b/src/datanode/src/sql/alter.rs @@ -91,7 +91,7 @@ mod tests { let new_column = &columns[0].column_schema; assert_eq!(new_column.name, "tagk_i"); - assert!(new_column.is_nullable); + assert!(new_column.is_nullable()); assert_eq!(new_column.data_type, ConcreteDataType::string_datatype()); } _ => unreachable!(), diff --git a/src/datatypes/src/data_type.rs b/src/datatypes/src/data_type.rs index 70ce9bcde4..fc41c8d088 100644 --- a/src/datatypes/src/data_type.rs +++ b/src/datatypes/src/data_type.rs @@ -115,6 +115,10 @@ impl ConcreteDataType { pub fn from_arrow_type(dt: &ArrowDataType) -> Self { ConcreteDataType::try_from(dt).expect("Unimplemented type") } + + pub fn is_null(&self) -> bool { + matches!(self, ConcreteDataType::Null(NullType)) + } } impl TryFrom<&ArrowDataType> for ConcreteDataType { @@ -322,7 +326,7 @@ mod tests { } #[test] - pub fn test_from_arrow_timestamp() { + fn test_from_arrow_timestamp() { assert_eq!( ConcreteDataType::timestamp_millis_datatype(), ConcreteDataType::from_arrow_time_unit(&arrow::datatypes::TimeUnit::Millisecond) @@ -342,7 +346,7 @@ mod tests { } #[test] - pub fn test_is_timestamp() { + fn test_is_timestamp() { assert!(ConcreteDataType::timestamp_millis_datatype().is_timestamp()); assert!(ConcreteDataType::timestamp_datatype(TimeUnit::Second).is_timestamp()); assert!(ConcreteDataType::timestamp_datatype(TimeUnit::Millisecond).is_timestamp()); @@ -353,4 +357,10 @@ mod tests { // to be used a data type for timestamp column assert!(!ConcreteDataType::int64_datatype().is_timestamp()); } + + #[test] + fn test_is_null() { + assert!(ConcreteDataType::null_datatype().is_null()); + assert!(!ConcreteDataType::int32_datatype().is_null()); + } } diff --git a/src/datatypes/src/error.rs b/src/datatypes/src/error.rs index 2e4ad9136d..185db77c2c 100644 --- a/src/datatypes/src/error.rs +++ b/src/datatypes/src/error.rs @@ -63,6 +63,18 @@ pub enum Error { source: arrow::error::ArrowError, backtrace: Backtrace, }, + + #[snafu(display("Unsupported column default constraint expression: {}", expr))] + UnsupportedDefaultExpr { expr: String, backtrace: Backtrace }, + + #[snafu(display("Default value should not be null for non null column"))] + NullDefault { backtrace: Backtrace }, + + #[snafu(display("Incompatible default value type, reason: {}", reason))] + DefaultValueType { + reason: String, + backtrace: Backtrace, + }, } impl ErrorExt for Error { diff --git a/src/datatypes/src/schema.rs b/src/datatypes/src/schema.rs index a78cbff679..efe07ae95f 100644 --- a/src/datatypes/src/schema.rs +++ b/src/datatypes/src/schema.rs @@ -1,3 +1,5 @@ +mod constraint; + use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; @@ -8,35 +10,8 @@ use snafu::{ensure, OptionExt, ResultExt}; use crate::data_type::{ConcreteDataType, DataType}; use crate::error::{self, DeserializeSnafu, Error, Result, SerializeSnafu}; -use crate::value::Value; - -/// Column's default constraint. -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub enum ColumnDefaultConstraint { - // A function invocation - // TODO(dennis): we save the function expression here, maybe use a struct in future. - Function(String), - // A value - Value(Value), -} - -impl TryFrom<&[u8]> for ColumnDefaultConstraint { - type Error = error::Error; - - fn try_from(bytes: &[u8]) -> Result { - let json = String::from_utf8_lossy(bytes); - serde_json::from_str(&json).context(DeserializeSnafu { json }) - } -} - -impl TryInto> for ColumnDefaultConstraint { - type Error = error::Error; - - fn try_into(self) -> Result> { - let s = serde_json::to_string(&self).context(SerializeSnafu)?; - Ok(s.into_bytes()) - } -} +pub use crate::schema::constraint::ColumnDefaultConstraint; +use crate::vectors::VectorRef; /// Key used to store column name of the timestamp column in metadata. /// @@ -54,8 +29,8 @@ const ARROW_FIELD_DEFAULT_CONSTRAINT_KEY: &str = "greptime:default_constraint"; pub struct ColumnSchema { pub name: String, pub data_type: ConcreteDataType, - pub is_nullable: bool, - pub default_constraint: Option, + is_nullable: bool, + default_constraint: Option, } impl ColumnSchema { @@ -72,12 +47,45 @@ impl ColumnSchema { } } + #[inline] + pub fn is_nullable(&self) -> bool { + self.is_nullable + } + + #[inline] + pub fn default_constraint(&self) -> Option<&ColumnDefaultConstraint> { + self.default_constraint.as_ref() + } + pub fn with_default_constraint( mut self, default_constraint: Option, - ) -> Self { + ) -> Result { + if let Some(constraint) = &default_constraint { + constraint.validate(&self.data_type, self.is_nullable)?; + } + self.default_constraint = default_constraint; - self + Ok(self) + } + + pub fn create_default_vector(&self, num_rows: usize) -> Result> { + match &self.default_constraint { + Some(c) => c + .create_default_vector(&self.data_type, self.is_nullable, num_rows) + .map(Some), + None => { + if self.is_nullable { + // No default constraint, use null as default value. + // TODO(yingwen): Use NullVector once it supports setting logical type. + ColumnDefaultConstraint::null_value() + .create_default_vector(&self.data_type, self.is_nullable, num_rows) + .map(Some) + } else { + Ok(None) + } + } + } } } @@ -327,7 +335,7 @@ impl TryFrom<&ColumnSchema> for Field { Ok(Field::new( column_schema.name.clone(), column_schema.data_type.as_arrow_type(), - column_schema.is_nullable, + column_schema.is_nullable(), ) .with_metadata(metadata)) } @@ -393,6 +401,7 @@ mod tests { use arrow::datatypes::DataType as ArrowDataType; use super::*; + use crate::value::Value; #[test] fn test_column_schema() { @@ -409,7 +418,8 @@ mod tests { #[test] fn test_column_schema_with_default_constraint() { let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true) - .with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::from(99)))); + .with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::from(99)))) + .unwrap(); let field = Field::try_from(&column_schema).unwrap(); assert_eq!("test", field.name); assert_eq!(ArrowDataType::Int32, field.data_type); @@ -426,6 +436,13 @@ mod tests { assert_eq!(column_schema, new_column_schema); } + #[test] + fn test_column_schema_invalid_default_constraint() { + ColumnSchema::new("test", ConcreteDataType::int32_datatype(), false) + .with_default_constraint(Some(ColumnDefaultConstraint::null_value())) + .unwrap_err(); + } + #[test] fn test_column_default_constraint_try_into_from() { let default_constraint = ColumnDefaultConstraint::Value(Value::from(42i64)); @@ -436,6 +453,29 @@ mod tests { assert_eq!(default_constraint, from_value); } + #[test] + fn test_column_schema_create_default_null() { + // Implicit default null. + let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true); + let v = column_schema.create_default_vector(5).unwrap().unwrap(); + assert_eq!(5, v.len()); + assert!(v.only_null()); + + // Explicit default null. + let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true) + .with_default_constraint(Some(ColumnDefaultConstraint::null_value())) + .unwrap(); + let v = column_schema.create_default_vector(5).unwrap().unwrap(); + assert_eq!(5, v.len()); + assert!(v.only_null()); + } + + #[test] + fn test_column_schema_no_default() { + let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), false); + assert!(column_schema.create_default_vector(5).unwrap().is_none()); + } + #[test] fn test_build_empty_schema() { let schema = SchemaBuilder::default().build().unwrap(); diff --git a/src/datatypes/src/schema/constraint.rs b/src/datatypes/src/schema/constraint.rs new file mode 100644 index 0000000000..8f54312d12 --- /dev/null +++ b/src/datatypes/src/schema/constraint.rs @@ -0,0 +1,244 @@ +use std::sync::Arc; + +use common_time::{util, Timestamp}; +use serde::{Deserialize, Serialize}; +use snafu::{ensure, ResultExt}; + +use crate::data_type::{ConcreteDataType, DataType}; +use crate::error::{self, Result}; +use crate::scalars::ScalarVector; +use crate::value::Value; +use crate::vectors::{ConstantVector, TimestampVector, VectorRef}; + +const CURRENT_TIMESTAMP: &str = "current_timestamp()"; + +/// Column's default constraint. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum ColumnDefaultConstraint { + // A function invocation + // TODO(dennis): we save the function expression here, maybe use a struct in future. + Function(String), + // A value + Value(Value), +} + +impl TryFrom<&[u8]> for ColumnDefaultConstraint { + type Error = error::Error; + + fn try_from(bytes: &[u8]) -> Result { + let json = String::from_utf8_lossy(bytes); + serde_json::from_str(&json).context(error::DeserializeSnafu { json }) + } +} + +impl TryInto> for ColumnDefaultConstraint { + type Error = error::Error; + + fn try_into(self) -> Result> { + let s = serde_json::to_string(&self).context(error::SerializeSnafu)?; + Ok(s.into_bytes()) + } +} + +impl ColumnDefaultConstraint { + /// Returns a default null constraint. + pub fn null_value() -> ColumnDefaultConstraint { + ColumnDefaultConstraint::Value(Value::Null) + } + + /// Check whether the constraint is valid for columns with given `data_type` + /// and `is_nullable` attributes. + pub fn validate(&self, data_type: &ConcreteDataType, is_nullable: bool) -> Result<()> { + ensure!(is_nullable || !self.maybe_null(), error::NullDefaultSnafu); + + match self { + ColumnDefaultConstraint::Function(expr) => { + ensure!( + expr == CURRENT_TIMESTAMP, + error::UnsupportedDefaultExprSnafu { expr } + ); + ensure!( + data_type.is_timestamp(), + error::DefaultValueTypeSnafu { + reason: "return value of the function must has timestamp type", + } + ); + } + ColumnDefaultConstraint::Value(v) => { + if !v.is_null() { + // Whether the value could be nullable has been checked before, only need + // to check the type compatibility here. + ensure!( + data_type.logical_type_id() == v.logical_type_id(), + error::DefaultValueTypeSnafu { + reason: format!( + "column has type {:?} but default value has type {:?}", + data_type.logical_type_id(), + v.logical_type_id() + ), + } + ); + } + } + } + + Ok(()) + } + + /// Create a vector that contains `num_rows` default values for given `data_type`. + /// + /// If `is_nullable` is `true`, then this method would returns error if the created + /// default value is null. + /// + /// # Panics + /// Panics if `num_rows == 0`. + pub fn create_default_vector( + &self, + data_type: &ConcreteDataType, + is_nullable: bool, + num_rows: usize, + ) -> Result { + assert!(num_rows > 0); + + match self { + ColumnDefaultConstraint::Function(expr) => { + // Functions should also ensure its return value is not null when + // is_nullable is true. + match &expr[..] { + // TODO(dennis): we only supports current_timestamp right now, + // it's better to use a expression framework in future. + CURRENT_TIMESTAMP => { + // TODO(yingwen): We should coerce the type to the physical type of + // input `data_type`. + let vector = + Arc::new(TimestampVector::from_slice(&[Timestamp::from_millis( + util::current_time_millis(), + )])); + Ok(Arc::new(ConstantVector::new(vector, num_rows))) + } + _ => error::UnsupportedDefaultExprSnafu { expr }.fail(), + } + } + ColumnDefaultConstraint::Value(v) => { + ensure!(is_nullable || !v.is_null(), error::NullDefaultSnafu); + + // TODO(yingwen): + // 1. For null value, we could use NullVector once it supports custom logical type. + // 2. For non null value, we could use ConstantVector, but it would cause all codes + // attempt to downcast the vector fail if they don't check whether the vector is const + // first. + let mut mutable_vector = data_type.create_mutable_vector(1); + mutable_vector.push_value_ref(v.as_value_ref())?; + let base_vector = mutable_vector.to_vector(); + Ok(base_vector.replicate(&[num_rows])) + } + } + } + + /// Returns true if this constraint might creates NULL. + fn maybe_null(&self) -> bool { + // Once we support more functions, we may return true if given function + // could return null. + matches!(self, ColumnDefaultConstraint::Value(Value::Null)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::vectors::Int32Vector; + + #[test] + fn test_null_default_constraint() { + let constraint = ColumnDefaultConstraint::null_value(); + assert!(constraint.maybe_null()); + let constraint = ColumnDefaultConstraint::Value(Value::Int32(10)); + assert!(!constraint.maybe_null()); + } + + #[test] + fn test_validate_null_constraint() { + let constraint = ColumnDefaultConstraint::null_value(); + let data_type = ConcreteDataType::int32_datatype(); + constraint.validate(&data_type, false).unwrap_err(); + constraint.validate(&data_type, true).unwrap(); + } + + #[test] + fn test_validate_value_constraint() { + let constraint = ColumnDefaultConstraint::Value(Value::Int32(10)); + let data_type = ConcreteDataType::int32_datatype(); + constraint.validate(&data_type, false).unwrap(); + constraint.validate(&data_type, true).unwrap(); + + constraint + .validate(&ConcreteDataType::uint32_datatype(), true) + .unwrap_err(); + } + + #[test] + fn test_validate_function_constraint() { + let constraint = ColumnDefaultConstraint::Function(CURRENT_TIMESTAMP.to_string()); + constraint + .validate(&ConcreteDataType::timestamp_millis_datatype(), false) + .unwrap(); + constraint + .validate(&ConcreteDataType::boolean_datatype(), false) + .unwrap_err(); + + let constraint = ColumnDefaultConstraint::Function("hello()".to_string()); + constraint + .validate(&ConcreteDataType::timestamp_millis_datatype(), false) + .unwrap_err(); + } + + #[test] + fn test_create_default_vector_by_null() { + let constraint = ColumnDefaultConstraint::null_value(); + let data_type = ConcreteDataType::int32_datatype(); + constraint + .create_default_vector(&data_type, false, 10) + .unwrap_err(); + + let constraint = ColumnDefaultConstraint::null_value(); + let v = constraint + .create_default_vector(&data_type, true, 3) + .unwrap(); + assert_eq!(3, v.len()); + for i in 0..v.len() { + assert_eq!(Value::Null, v.get(i)); + } + } + + #[test] + fn test_create_default_vector_by_value() { + let constraint = ColumnDefaultConstraint::Value(Value::Int32(10)); + let data_type = ConcreteDataType::int32_datatype(); + let v = constraint + .create_default_vector(&data_type, false, 4) + .unwrap(); + let expect: VectorRef = Arc::new(Int32Vector::from_values(vec![10; 4])); + assert_eq!(expect, v); + } + + #[test] + fn test_create_default_vector_by_func() { + let constraint = ColumnDefaultConstraint::Function(CURRENT_TIMESTAMP.to_string()); + let data_type = ConcreteDataType::timestamp_millis_datatype(); + let v = constraint + .create_default_vector(&data_type, false, 4) + .unwrap(); + assert_eq!(4, v.len()); + assert!( + matches!(v.get(0), Value::Timestamp(_)), + "v {:?} is not timestamp", + v.get(0) + ); + + let constraint = ColumnDefaultConstraint::Function("no".to_string()); + let data_type = ConcreteDataType::timestamp_millis_datatype(); + constraint + .create_default_vector(&data_type, false, 4) + .unwrap_err(); + } +} diff --git a/src/datatypes/src/types/binary_type.rs b/src/datatypes/src/types/binary_type.rs index 5da23b1ab1..5cbe9330b4 100644 --- a/src/datatypes/src/types/binary_type.rs +++ b/src/datatypes/src/types/binary_type.rs @@ -25,7 +25,7 @@ impl DataType for BinaryType { } fn logical_type_id(&self) -> LogicalTypeId { - LogicalTypeId::String + LogicalTypeId::Binary } fn default_value(&self) -> Value { diff --git a/src/datatypes/src/value.rs b/src/datatypes/src/value.rs index 3c99a88a2b..b1f8eb72ae 100644 --- a/src/datatypes/src/value.rs +++ b/src/datatypes/src/value.rs @@ -9,6 +9,7 @@ use serde::{Deserialize, Serialize}; use crate::error::{self, Result}; use crate::prelude::*; +use crate::type_id::LogicalTypeId; use crate::vectors::ListVector; pub type OrderedF32 = OrderedFloat; @@ -69,7 +70,7 @@ impl Value { Value::Binary(_) => ConcreteDataType::binary_datatype(), Value::List(list) => ConcreteDataType::list_datatype(list.datatype().clone()), Value::Date(_) => ConcreteDataType::date_datatype(), - Value::DateTime(_) => ConcreteDataType::date_datatype(), + Value::DateTime(_) => ConcreteDataType::datetime_datatype(), Value::Timestamp(v) => ConcreteDataType::timestamp_datatype(v.unit()), } } @@ -114,6 +115,30 @@ impl Value { Value::Timestamp(v) => ValueRef::Timestamp(*v), } } + + /// Returns the logical type of the value. + pub fn logical_type_id(&self) -> LogicalTypeId { + match self { + Value::Null => LogicalTypeId::Null, + Value::Boolean(_) => LogicalTypeId::Boolean, + Value::UInt8(_) => LogicalTypeId::UInt8, + Value::UInt16(_) => LogicalTypeId::UInt16, + Value::UInt32(_) => LogicalTypeId::UInt32, + Value::UInt64(_) => LogicalTypeId::UInt64, + Value::Int8(_) => LogicalTypeId::Int8, + Value::Int16(_) => LogicalTypeId::Int16, + Value::Int32(_) => LogicalTypeId::Int32, + Value::Int64(_) => LogicalTypeId::Int64, + Value::Float32(_) => LogicalTypeId::Float32, + Value::Float64(_) => LogicalTypeId::Float64, + Value::String(_) => LogicalTypeId::String, + Value::Binary(_) => LogicalTypeId::Binary, + Value::List(_) => LogicalTypeId::List, + Value::Date(_) => LogicalTypeId::Date, + Value::DateTime(_) => LogicalTypeId::DateTime, + Value::Timestamp(_) => LogicalTypeId::Timestamp, + } + } } macro_rules! impl_ord_for_value_like { @@ -582,68 +607,69 @@ mod tests { assert_eq!(Value::Binary(bytes.clone()), Value::from(bytes)); } + fn check_type_and_value(data_type: &ConcreteDataType, value: &Value) { + assert_eq!(*data_type, value.data_type()); + assert_eq!(data_type.logical_type_id(), value.logical_type_id()); + } + #[test] fn test_value_datatype() { - assert_eq!( - ConcreteDataType::boolean_datatype(), - Value::Boolean(true).data_type() + check_type_and_value(&ConcreteDataType::boolean_datatype(), &Value::Boolean(true)); + check_type_and_value(&ConcreteDataType::uint8_datatype(), &Value::UInt8(u8::MIN)); + check_type_and_value( + &ConcreteDataType::uint16_datatype(), + &Value::UInt16(u16::MIN), ); - assert_eq!( - ConcreteDataType::uint8_datatype(), - Value::UInt8(u8::MIN).data_type() + check_type_and_value( + &ConcreteDataType::uint16_datatype(), + &Value::UInt16(u16::MAX), ); - assert_eq!( - ConcreteDataType::uint16_datatype(), - Value::UInt16(u16::MIN).data_type() + check_type_and_value( + &ConcreteDataType::uint32_datatype(), + &Value::UInt32(u32::MIN), ); - assert_eq!( - ConcreteDataType::uint16_datatype(), - Value::UInt16(u16::MAX).data_type() + check_type_and_value( + &ConcreteDataType::uint64_datatype(), + &Value::UInt64(u64::MIN), ); - assert_eq!( - ConcreteDataType::uint32_datatype(), - Value::UInt32(u32::MIN).data_type() + check_type_and_value(&ConcreteDataType::int8_datatype(), &Value::Int8(i8::MIN)); + check_type_and_value(&ConcreteDataType::int16_datatype(), &Value::Int16(i16::MIN)); + check_type_and_value(&ConcreteDataType::int32_datatype(), &Value::Int32(i32::MIN)); + check_type_and_value(&ConcreteDataType::int64_datatype(), &Value::Int64(i64::MIN)); + check_type_and_value( + &ConcreteDataType::float32_datatype(), + &Value::Float32(OrderedFloat(f32::MIN)), ); - assert_eq!( - ConcreteDataType::uint64_datatype(), - Value::UInt64(u64::MIN).data_type() + check_type_and_value( + &ConcreteDataType::float64_datatype(), + &Value::Float64(OrderedFloat(f64::MIN)), ); - assert_eq!( - ConcreteDataType::int8_datatype(), - Value::Int8(i8::MIN).data_type() + check_type_and_value( + &ConcreteDataType::string_datatype(), + &Value::String(StringBytes::from("hello")), ); - assert_eq!( - ConcreteDataType::int16_datatype(), - Value::Int16(i16::MIN).data_type() + check_type_and_value( + &ConcreteDataType::binary_datatype(), + &Value::Binary(Bytes::from(b"world".as_slice())), ); - assert_eq!( - ConcreteDataType::int32_datatype(), - Value::Int32(i32::MIN).data_type() + check_type_and_value( + &ConcreteDataType::list_datatype(ConcreteDataType::int32_datatype()), + &Value::List(ListValue::new( + Some(Box::new(vec![Value::Int32(10)])), + ConcreteDataType::int32_datatype(), + )), ); - assert_eq!( - ConcreteDataType::int64_datatype(), - Value::Int64(i64::MIN).data_type() + check_type_and_value( + &ConcreteDataType::date_datatype(), + &Value::Date(Date::new(1)), ); - assert_eq!( - ConcreteDataType::float32_datatype(), - Value::Float32(OrderedFloat(f32::MIN)).data_type(), + check_type_and_value( + &ConcreteDataType::datetime_datatype(), + &Value::DateTime(DateTime::new(1)), ); - assert_eq!( - ConcreteDataType::float64_datatype(), - Value::Float64(OrderedFloat(f64::MIN)).data_type(), - ); - assert_eq!( - ConcreteDataType::string_datatype(), - Value::String(StringBytes::from("hello")).data_type(), - ); - assert_eq!( - ConcreteDataType::binary_datatype(), - Value::Binary(Bytes::from(b"world".as_slice())).data_type() - ); - - assert_eq!( - ConcreteDataType::timestamp_millis_datatype(), - Value::Timestamp(Timestamp::from_millis(1)).data_type() + check_type_and_value( + &ConcreteDataType::timestamp_millis_datatype(), + &Value::Timestamp(Timestamp::from_millis(1)), ); } diff --git a/src/datatypes/src/vectors/operations.rs b/src/datatypes/src/vectors/operations.rs index ede948ca02..a35e17cf67 100644 --- a/src/datatypes/src/vectors/operations.rs +++ b/src/datatypes/src/vectors/operations.rs @@ -12,7 +12,8 @@ use crate::vectors::{Vector, VectorRef}; /// Vector compute operations. pub trait VectorOp { /// Copies each element according `offsets` parameter. - /// (`i-th` element should be copied `offsets[i] - offsets[i - 1]` times.) + /// - `i-th` element should be copied `offsets[i] - offsets[i - 1]` times + /// - `0-th` element would be copied `offsets[0]` times /// /// # Panics /// Panics if `offsets.len() != self.len()`. diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 0f7779b1ff..82fb0c43e2 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -216,8 +216,8 @@ fn columns_to_expr(column_defs: &[ColumnDef]) -> Result> { Ok(GrpcColumnDef { name: schema.name.clone(), datatype: datatype as i32, - is_nullable: schema.is_nullable, - default_constraint: match &schema.default_constraint { + is_nullable: schema.is_nullable(), + default_constraint: match schema.default_constraint() { None => None, Some(v) => Some(v.clone().try_into().context( ConvertColumnDefaultConstraintSnafu { diff --git a/src/sql/src/error.rs b/src/sql/src/error.rs index 6f9f75bcbd..632a50ec6b 100644 --- a/src/sql/src/error.rs +++ b/src/sql/src/error.rs @@ -82,6 +82,13 @@ pub enum Error { #[snafu(display("Invalid database name: {}", name))] InvalidDatabaseName { name: String, backtrace: Backtrace }, + + #[snafu(display("Invalid default constraint, column: {}, source: {}", column, source))] + InvalidDefault { + column: String, + #[snafu(backtrace)] + source: datatypes::error::Error, + }, } impl ErrorExt for Error { @@ -96,7 +103,8 @@ impl ErrorExt for Error { | Tokenizer { .. } | InvalidSql { .. } | ParseSqlValue { .. } - | SqlTypeNotSupported { .. } => StatusCode::InvalidSyntax, + | SqlTypeNotSupported { .. } + | InvalidDefault { .. } => StatusCode::InvalidSyntax, InvalidDatabaseName { .. } | ColumnTypeMismatch { .. } => StatusCode::InvalidArguments, } diff --git a/src/sql/src/statements.rs b/src/sql/src/statements.rs index 4921bf1bd0..0229b547a2 100644 --- a/src/sql/src/statements.rs +++ b/src/sql/src/statements.rs @@ -12,7 +12,7 @@ use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema}; use datatypes::types::DateTimeType; use datatypes::value::Value; -use snafu::ensure; +use snafu::{ensure, ResultExt}; use crate::ast::{ ColumnDef, ColumnOption, ColumnOptionDef, DataType as SqlDataType, Expr, ObjectName, @@ -209,6 +209,8 @@ fn parse_column_default_constraint( } } +// TODO(yingwen): Make column nullable by default, and checks invalid case like +// a column is not nullable but has a default value null. /// Create a `ColumnSchema` from `ColumnDef`. pub fn column_def_to_schema(column_def: &ColumnDef) -> Result { let is_nullable = column_def @@ -221,12 +223,11 @@ pub fn column_def_to_schema(column_def: &ColumnDef) -> Result { let default_constraint = parse_column_default_constraint(&name, &data_type, &column_def.options)?; - Ok(ColumnSchema { - name, - data_type, - is_nullable, - default_constraint, - }) + ColumnSchema::new(name, data_type, is_nullable) + .with_default_constraint(default_constraint) + .context(error::InvalidDefaultSnafu { + column: &column_def.name.value, + }) } fn sql_data_type_to_concrete_data_type(data_type: &SqlDataType) -> Result { @@ -386,7 +387,7 @@ mod tests { &SqlValue::DoubleQuotedString("2022-02-22 00:01:03".to_string()), ) .unwrap(); - assert_eq!(ConcreteDataType::date_datatype(), value.data_type()); + assert_eq!(ConcreteDataType::datetime_datatype(), value.data_type()); if let Value::DateTime(d) = value { assert_eq!("2022-02-22 00:01:03", d.to_string()); } else { diff --git a/src/storage/src/error.rs b/src/storage/src/error.rs index a4d625c3c5..368a179184 100644 --- a/src/storage/src/error.rs +++ b/src/storage/src/error.rs @@ -22,12 +22,6 @@ pub enum Error { source: MetadataError, }, - #[snafu(display("Invalid schema of input data, region: {}", region))] - InvalidInputSchema { - region: String, - backtrace: Backtrace, - }, - #[snafu(display("Missing column {} in write batch", column))] BatchMissingColumn { column: String, @@ -132,7 +126,10 @@ pub enum Error { }, #[snafu(display("Invalid timestamp in write batch, source: {}", source))] - InvalidTimestamp { source: crate::write_batch::Error }, + InvalidTimestamp { + #[snafu(backtrace)] + source: crate::write_batch::Error, + }, #[snafu(display("Task already cancelled"))] Cancelled { backtrace: Backtrace }, @@ -269,6 +266,35 @@ pub enum Error { #[snafu(backtrace)] source: MetadataError, }, + + #[snafu(display( + "Failed to add default value for column {}, source: {}", + column, + source + ))] + AddDefault { + column: String, + source: crate::write_batch::Error, + }, + + #[snafu(display( + "Not allowed to write data with version {} to schema with version {}", + data_version, + schema_version + ))] + WriteToOldVersion { + /// Schema version of data to write. + data_version: u32, + schema_version: u32, + backtrace: Backtrace, + }, + + #[snafu(display("Column {} not in schema with version {}", column, version))] + NotInSchemaToCompat { + column: String, + version: u32, + backtrace: Backtrace, + }, } pub type Result = std::result::Result; @@ -279,12 +305,13 @@ impl ErrorExt for Error { match self { InvalidScanIndex { .. } - | InvalidInputSchema { .. } | BatchMissingColumn { .. } | BatchMissingTimestamp { .. } | InvalidTimestamp { .. } | InvalidProjection { .. } - | BuildBatch { .. } => StatusCode::InvalidArguments, + | BuildBatch { .. } + | NotInSchemaToCompat { .. } + | WriteToOldVersion { .. } => StatusCode::InvalidArguments, Utf8 { .. } | EncodeJson { .. } @@ -322,6 +349,7 @@ impl ErrorExt for Error { source.status_code() } PushBatch { source, .. } => source.status_code(), + AddDefault { source, .. } => source.status_code(), } } diff --git a/src/storage/src/manifest/action.rs b/src/storage/src/manifest/action.rs index 639c000919..2b8a33ccef 100644 --- a/src/storage/src/manifest/action.rs +++ b/src/storage/src/manifest/action.rs @@ -45,7 +45,11 @@ pub struct RawColumnFamiliesMetadata { #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub struct RegionChange { + /// The committed sequence of the region when this change happens. So the + /// data with sequence **greater than** this sequence would use the new + /// metadata. pub committed_sequence: SequenceNumber, + /// The metadata after changed. pub metadata: RawRegionMetadata, } diff --git a/src/storage/src/memtable/inserter.rs b/src/storage/src/memtable/inserter.rs index 67f837fcbe..b5d4fdd170 100644 --- a/src/storage/src/memtable/inserter.rs +++ b/src/storage/src/memtable/inserter.rs @@ -1,12 +1,11 @@ use std::collections::HashMap; -use std::sync::Arc; use std::time::Duration; use common_time::{RangeMillis, TimestampMillis}; use datatypes::prelude::ScalarVector; use datatypes::schema::SchemaRef; -use datatypes::vectors::{NullVector, TimestampVector, VectorRef}; -use snafu::{ensure, OptionExt}; +use datatypes::vectors::{TimestampVector, VectorRef}; +use snafu::OptionExt; use store_api::storage::{ColumnDescriptor, OpType, SequenceNumber}; use crate::error::{self, Result}; @@ -60,6 +59,9 @@ impl Inserter { return Ok(()); } + // Only validate schema in debug mod. + validate_input_and_memtable_schemas(batch, memtables); + // Enough to hold all key or value columns. let total_column_num = batch.schema().num_columns(); // Reusable KeyValues buffer. @@ -164,6 +166,18 @@ impl Inserter { } } +#[cfg(debug_assertions)] +fn validate_input_and_memtable_schemas(batch: &WriteBatch, memtables: &MemtableSet) { + let batch_schema = batch.schema(); + for (_, memtable) in memtables.iter() { + let memtable_schema = memtable.schema(); + let user_schema = memtable_schema.user_schema(); + debug_assert_eq!(batch_schema.version(), user_schema.version()); + // Only validate column schemas. + debug_assert_eq!(batch_schema.column_schemas(), user_schema.column_schemas()); + } +} + fn new_range_index_map(time_ranges: &[RangeMillis]) -> RangeIndexMap { time_ranges .iter() @@ -177,18 +191,10 @@ fn clone_put_data_column_to( desc: &ColumnDescriptor, target: &mut Vec, ) -> Result<()> { - if let Some(vector) = put_data.column_by_name(&desc.name) { - target.push(vector.clone()); - } else { - // The write batch should have been validated before. - ensure!( - desc.is_nullable, - error::BatchMissingColumnSnafu { column: &desc.name } - ); - - let num_rows = put_data.num_rows(); - target.push(Arc::new(NullVector::new(num_rows))); - } + let vector = put_data + .column_by_name(&desc.name) + .context(error::BatchMissingColumnSnafu { column: &desc.name })?; + target.push(vector.clone()); Ok(()) } @@ -289,6 +295,8 @@ fn compute_slice_indexes( #[cfg(test)] mod tests { + use std::sync::Arc; + use common_time::timestamp::Timestamp; use datatypes::prelude::ScalarVectorBuilder; use datatypes::vectors::{Int64Vector, TimestampVector, TimestampVectorBuilder}; diff --git a/src/storage/src/metadata.rs b/src/storage/src/metadata.rs index 3085becc75..a967c0aa0c 100644 --- a/src/storage/src/metadata.rs +++ b/src/storage/src/metadata.rs @@ -202,7 +202,7 @@ impl RegionMetadata { // We don't check the case that the column is not nullable but default constraint is null. The // caller should guarantee this. ensure!( - add_column.desc.is_nullable || add_column.desc.default_constraint.is_some(), + add_column.desc.is_nullable() || add_column.desc.default_constraint().is_some(), AddNonNullColumnSnafu { name: &add_column.desc.name, } diff --git a/src/storage/src/proto/write_batch.rs b/src/storage/src/proto/write_batch.rs index b4810a18b8..7893aa9110 100644 --- a/src/storage/src/proto/write_batch.rs +++ b/src/storage/src/proto/write_batch.rs @@ -98,7 +98,7 @@ impl From<&schema::ColumnSchema> for ColumnSchema { Self { name: cs.name.clone(), data_type: DataType::from(&cs.data_type).into(), - is_nullable: cs.is_nullable, + is_nullable: cs.is_nullable(), } } } @@ -140,7 +140,12 @@ impl From<&ConcreteDataType> for DataType { ConcreteDataType::Null(_) => DataType::Null, ConcreteDataType::Binary(_) => DataType::Binary, ConcreteDataType::Timestamp(_) => DataType::Timestamp, - _ => unimplemented!(), // TODO(jiachun): Maybe support some composite types in the future , such as list, struct, etc. + ConcreteDataType::Date(_) + | ConcreteDataType::DateTime(_) + | ConcreteDataType::List(_) => { + // TODO(jiachun): Maybe support some composite types in the future , such as list, struct, etc. + unimplemented!("data type {:?} is not supported", data_type) + } } } } @@ -276,7 +281,8 @@ gen_put_data!(string, StringVectorBuilder, v, v.as_str()); gen_put_data!(timestamp, TimestampVectorBuilder, v, (*v).into()); pub fn gen_columns(vector: &VectorRef) -> Result { - match vector.data_type() { + let data_type = vector.data_type(); + match data_type { ConcreteDataType::Boolean(_) => gen_columns_bool(vector), ConcreteDataType::Int8(_) => gen_columns_i8(vector), ConcreteDataType::Int16(_) => gen_columns_i16(vector), @@ -291,8 +297,12 @@ pub fn gen_columns(vector: &VectorRef) -> Result { ConcreteDataType::Binary(_) => gen_columns_binary(vector), ConcreteDataType::String(_) => gen_columns_string(vector), ConcreteDataType::Timestamp(_) => gen_columns_timestamp(vector), - _ => { - unimplemented!() // TODO(jiachun): Maybe support some composite types in the future, such as list, struct, etc. + ConcreteDataType::Null(_) + | ConcreteDataType::Date(_) + | ConcreteDataType::DateTime(_) + | ConcreteDataType::List(_) => { + // TODO(jiachun): Maybe support some composite types in the future, such as list, struct, etc. + unimplemented!("data type {:?} is not supported", data_type) } } } @@ -313,6 +323,12 @@ pub fn gen_put_data_vector(data_type: ConcreteDataType, column: Column) -> Resul ConcreteDataType::Binary(_) => gen_put_data_binary(column), ConcreteDataType::String(_) => gen_put_data_string(column), ConcreteDataType::Timestamp(_) => gen_put_data_timestamp(column), - _ => unimplemented!(), // TODO(jiachun): Maybe support some composite types in the future, such as list, struct, etc. + ConcreteDataType::Null(_) + | ConcreteDataType::Date(_) + | ConcreteDataType::DateTime(_) + | ConcreteDataType::List(_) => { + // TODO(jiachun): Maybe support some composite types in the future, such as list, struct, etc. + unimplemented!("data type {:?} is not supported", data_type) + } } } diff --git a/src/storage/src/region.rs b/src/storage/src/region.rs index 923bcbb9cc..c05a46a002 100644 --- a/src/storage/src/region.rs +++ b/src/storage/src/region.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use async_trait::async_trait; use common_telemetry::logging; -use snafu::{ensure, ResultExt}; +use snafu::ResultExt; use store_api::logstore::LogStore; use store_api::manifest::{self, Manifest, ManifestVersion, MetaActionIterator}; use store_api::storage::{ @@ -23,6 +23,7 @@ use crate::manifest::{ use crate::memtable::MemtableBuilderRef; use crate::metadata::{RegionMetaImpl, RegionMetadata}; pub use crate::region::writer::{AlterContext, RegionWriter, RegionWriterRef, WriterContext}; +use crate::schema::compat::CompatWrite; use crate::snapshot::SnapshotImpl; use crate::sst::AccessLayerRef; use crate::version::VersionEdit; @@ -63,7 +64,10 @@ impl Region for RegionImpl { self.inner.in_memory_metadata() } - async fn write(&self, ctx: &WriteContext, request: WriteBatch) -> Result { + async fn write(&self, ctx: &WriteContext, mut request: WriteBatch) -> Result { + // Compat the schema of the write batch outside of the write lock. + self.inner.compat_write_batch(&mut request)?; + self.inner.write(ctx, request).await } @@ -320,6 +324,11 @@ impl RegionImpl { async fn wait_flush_done(&self) -> Result<()> { self.inner.writer.wait_flush_done().await } + + /// Write to inner, also the `RegionWriter` directly. + async fn write_inner(&self, ctx: &WriteContext, request: WriteBatch) -> Result { + self.inner.write(ctx, request).await + } } /// Shared data of region. @@ -377,19 +386,17 @@ impl RegionInner { SnapshotImpl::new(version, sequence, self.sst_layer.clone()) } - async fn write(&self, ctx: &WriteContext, request: WriteBatch) -> Result { - // FIXME(yingwen): [alter] The schema may be outdated. - let metadata = self.in_memory_metadata(); + fn compat_write_batch(&self, request: &mut WriteBatch) -> Result<()> { + let metadata = self.version_control().metadata(); let schema = metadata.schema(); - // Only compare column schemas. - ensure!( - schema.column_schemas() == request.schema().column_schemas(), - error::InvalidInputSchemaSnafu { - region: &self.shared.name - } - ); + // Try to make request schema compatible with region's outside of write lock. Note that + // schema might be altered after this step. + request.compat_write(schema.user_schema()) + } + /// Write to writer directly. + async fn write(&self, ctx: &WriteContext, request: WriteBatch) -> Result { let writer_ctx = WriterContext { shared: &self.shared, flush_strategy: &self.flush_strategy, @@ -399,12 +406,18 @@ impl RegionInner { writer: &self.writer, manifest: &self.manifest, }; - // Now altering schema is not allowed, so it is safe to validate schema outside of the lock. + // The writer would also try to compat the schema of write batch if it finds out the + // schema version of request is less than current schema version. self.writer.write(ctx, request, writer_ctx).await } async fn alter(&self, request: AlterRequest) -> Result<()> { - // TODO(yingwen): [alter] Log the request. + logging::info!( + "Alter region {}, name: {}, request: {:?}", + self.shared.id, + self.shared.name, + request + ); let alter_ctx = AlterContext { shared: &self.shared, diff --git a/src/storage/src/region/tests.rs b/src/storage/src/region/tests.rs index 48c4e4ea05..ae7956cbf5 100644 --- a/src/storage/src/region/tests.rs +++ b/src/storage/src/region/tests.rs @@ -64,6 +64,20 @@ impl TesterBase { self.region.write(&self.write_ctx, batch).await.unwrap() } + /// Put without version specified directly to inner writer. + pub async fn put_inner(&self, data: &[(i64, Option)]) -> WriteResponse { + let data: Vec<(Timestamp, Option)> = + data.iter().map(|(l, r)| ((*l).into(), *r)).collect(); + let mut batch = new_write_batch_for_test(false); + let put_data = new_put_data(&data); + batch.put(put_data).unwrap(); + + self.region + .write_inner(&self.write_ctx, batch) + .await + .unwrap() + } + /// Scan all data. pub async fn full_scan(&self) -> Vec<(i64, Option)> { logging::info!("Full scan with ctx {:?}", self.read_ctx); diff --git a/src/storage/src/region/tests/alter.rs b/src/storage/src/region/tests/alter.rs index fefef0c430..94923546ac 100644 --- a/src/storage/src/region/tests/alter.rs +++ b/src/storage/src/region/tests/alter.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use common_time::Timestamp; use datatypes::prelude::ConcreteDataType; use datatypes::prelude::ScalarVector; -use datatypes::type_id::LogicalTypeId; use datatypes::vectors::Int64Vector; use datatypes::vectors::TimestampVector; use log_store::fs::log::LocalFileLogStore; @@ -18,10 +17,9 @@ use tempdir::TempDir; use crate::region::tests::{self, FileTesterBase}; use crate::region::OpenOptions; use crate::region::RegionImpl; +use crate::test_util; use crate::test_util::config_util; -use crate::test_util::{self, write_batch_util}; use crate::write_batch::PutData; -use crate::write_batch::WriteBatch; const REGION_NAME: &str = "region-alter-0"; @@ -58,18 +56,6 @@ impl DataRow { } } -fn new_write_batch_for_test() -> WriteBatch { - write_batch_util::new_write_batch( - &[ - ("k0", LogicalTypeId::Int64, true), - (test_util::TIMESTAMP_NAME, LogicalTypeId::Timestamp, false), - ("v0", LogicalTypeId::Int64, true), - ("v1", LogicalTypeId::Int64, true), - ], - Some(1), - ) -} - fn new_put_data(data: &[DataRow]) -> PutData { let mut put_data = PutData::with_num_columns(4); @@ -122,8 +108,9 @@ impl AlterTester { metadata.schema().clone() } + // Put with schema k0, ts, v0, v1 async fn put(&self, data: &[DataRow]) -> WriteResponse { - let mut batch = new_write_batch_for_test(); + let mut batch = self.base().region.write_request(); let put_data = new_put_data(data); batch.put(put_data).unwrap(); @@ -135,10 +122,17 @@ impl AlterTester { } /// Put data with initial schema. - async fn put_before_alter(&self, data: &[(i64, Option)]) { + async fn put_with_init_schema(&self, data: &[(i64, Option)]) { + // put of FileTesterBase always use initial schema version. self.base().put(data).await; } + /// Put data to inner writer with initial schema. + async fn put_inner_with_init_schema(&self, data: &[(i64, Option)]) { + // put of FileTesterBase always use initial schema version. + self.base().put_inner(data).await; + } + async fn alter(&self, mut req: AlterRequest) { let version = self.version(); req.version = version; @@ -200,13 +194,14 @@ fn check_schema_names(schema: &SchemaRef, names: &[&str]) { #[tokio::test] async fn test_alter_region_with_reopen() { common_telemetry::init_default_ut_logging(); + let dir = TempDir::new("alter-region").unwrap(); let store_dir = dir.path().to_str().unwrap(); let mut tester = AlterTester::new(store_dir).await; let data = vec![(1000, Some(100)), (1001, Some(101)), (1002, Some(102))]; - tester.put_before_alter(&data).await; + tester.put_with_init_schema(&data).await; assert_eq!(3, tester.full_scan().await.len()); let schema = tester.schema(); @@ -265,7 +260,7 @@ async fn test_alter_region() { let data = vec![(1000, Some(100)), (1001, Some(101)), (1002, Some(102))]; - tester.put_before_alter(&data).await; + tester.put_with_init_schema(&data).await; let schema = tester.schema(); check_schema_names(&schema, &["timestamp", "v0"]); @@ -295,3 +290,29 @@ async fn test_alter_region() { let schema = tester.schema(); check_schema_names(&schema, &["k0", "timestamp", "v2", "v3"]); } + +#[tokio::test] +async fn test_put_old_schema_after_alter() { + let dir = TempDir::new("put-old").unwrap(); + let store_dir = dir.path().to_str().unwrap(); + let tester = AlterTester::new(store_dir).await; + + let data = vec![(1000, Some(100)), (1001, Some(101)), (1002, Some(102))]; + + tester.put_with_init_schema(&data).await; + + let req = add_column_req(&[ + (new_column_desc(4, "k0"), true), // key column k0 + (new_column_desc(5, "v1"), false), // value column v1 + ]); + tester.alter(req).await; + + // Put with old schema. + let data = vec![(1003, Some(103)), (1004, Some(104))]; + tester.put_with_init_schema(&data).await; + + // Put data with old schema directly to the inner writer, to check that the region + // writer could compat the schema of write batch. + let data = vec![(1003, Some(103)), (1004, Some(104))]; + tester.put_inner_with_init_schema(&data).await; +} diff --git a/src/storage/src/region/writer.rs b/src/storage/src/region/writer.rs index a872b979d4..fd1edf0a37 100644 --- a/src/storage/src/region/writer.rs +++ b/src/storage/src/region/writer.rs @@ -18,6 +18,7 @@ use crate::manifest::action::{ use crate::memtable::{Inserter, MemtableBuilderRef, MemtableId, MemtableSet}; use crate::proto::wal::WalHeader; use crate::region::{RecoveredMetadataMap, RegionManifest, SharedDataRef}; +use crate::schema::compat::CompatWrite; use crate::sst::AccessLayerRef; use crate::version::{VersionControlRef, VersionEdit}; use crate::wal::{Payload, Wal}; @@ -153,16 +154,25 @@ impl RegionWriter { // Acquire the version lock before altering the metadata. let _lock = self.version_mutex.lock().await; + let committed_sequence = version_control.committed_sequence(); let mut action_list = RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange { metadata: raw, - committed_sequence: version_control.committed_sequence(), + committed_sequence, })); let new_metadata = Arc::new(new_metadata); // Persist the meta action. let prev_version = version_control.current_manifest_version(); action_list.set_prev_version(prev_version); + + logging::debug!( + "Try to alter schema of region {}, region_id: {}, action_list: {:?}", + new_metadata.name(), + new_metadata.id(), + action_list + ); + let manifest_version = alter_ctx.manifest.update(action_list).await?; // Now we could switch memtables and apply the new metadata to the version. @@ -181,6 +191,8 @@ impl RegionWriter { version_control: &VersionControlRef, manifest_version: ManifestVersion, ) -> Result<()> { + // We always bump the committed sequence regardless whether persisting the manifest version + // to wal is success, to avoid RegionMetaAction use same committed sequence in accident. let next_sequence = version_control.committed_sequence() + 1; version_control.set_committed_sequence(next_sequence); @@ -188,8 +200,6 @@ impl RegionWriter { wal.write_to_wal(next_sequence, header, Payload::None) .await?; - version_control.set_committed_sequence(next_sequence); - Ok(()) } } @@ -261,20 +271,26 @@ impl WriterInner { &mut self, version_mutex: &Mutex<()>, _ctx: &WriteContext, - request: WriteBatch, + mut request: WriteBatch, writer_ctx: WriterContext<'_, S>, ) -> Result { let time_ranges = self.preprocess_write(&request, &writer_ctx).await?; - - // TODO(yingwen): Write wal and get sequence. let version_control = writer_ctx.version_control(); - let version = version_control.current(); let _lock = version_mutex.lock().await; + + let metadata = version_control.metadata(); + // We need to check the schema again since it might has been altered. We need + // to compat request's schema before writing it into the WAL otherwise some + // default constraint like `current_timestamp()` would yield different value + // during replay. + request.compat_write(metadata.schema().user_schema())?; + let committed_sequence = version_control.committed_sequence(); // Sequence for current write batch. let next_sequence = committed_sequence + 1; + let version = version_control.current(); let wal_header = WalHeader::with_last_manifest_version(version.manifest_version()); writer_ctx .wal @@ -318,10 +334,11 @@ impl WriterInner { // should be flushed_sequence + 1. let mut stream = writer_ctx.wal.read_from_wal(flushed_sequence + 1).await?; while let Some((req_sequence, _header, request)) = stream.try_next().await? { - while let Some((next_apply_sequence, _)) = next_apply_metadata { - if req_sequence >= next_apply_sequence { - // It's safe to unwrap here. It's checked above. - // Move out metadata to avoid cloning it. + while let Some((sequence_before_alter, _)) = next_apply_metadata { + // There might be multiple metadata changes to be applied, so a loop is necessary. + if req_sequence > sequence_before_alter { + // This is the first request that use the new metadata. + // It's safe to unwrap here. It's checked above. Move out metadata to avoid cloning it. let (_, (manifest_version, metadata)) = next_apply_metadata.take().unwrap(); version_control.freeze_mutable_and_apply_metadata( Arc::new(metadata.try_into().context( @@ -332,13 +349,15 @@ impl WriterInner { manifest_version, ); num_recovered_metadata += 1; - logging::debug!("Applied metadata to region: {} when replaying WAL: sequence={} manifest={} ", - writer_ctx.shared.name, - next_apply_sequence, - manifest_version); + logging::debug!( + "Applied metadata to region: {} when replaying WAL: sequence={} manifest={} ", + writer_ctx.shared.name, + sequence_before_alter, + manifest_version + ); next_apply_metadata = recovered_metadata.pop_first(); } else { - // Keep the next_apply_metadata until req_sequence >= next_apply_sequence + // Keep the next_apply_metadata until req_sequence > sequence_before_alter break; } } diff --git a/src/storage/src/schema.rs b/src/storage/src/schema.rs index 0796020b6f..549d3ec270 100644 --- a/src/storage/src/schema.rs +++ b/src/storage/src/schema.rs @@ -1,3 +1,5 @@ +pub mod compat; + use std::cmp::Ordering; use std::collections::{BTreeSet, HashMap}; use std::sync::Arc; diff --git a/src/storage/src/schema/compat.rs b/src/storage/src/schema/compat.rs new file mode 100644 index 0000000000..2967a20a54 --- /dev/null +++ b/src/storage/src/schema/compat.rs @@ -0,0 +1,16 @@ +//! Utilities for resolving schema compatibility problems. + +use datatypes::schema::SchemaRef; + +use crate::error::Result; + +/// Make schema compatible to write to target with another schema. +pub trait CompatWrite { + /// Makes the schema of `self` compatible with `dest_schema`. + /// + /// For column in `dest_schema` but not in `self`, this method would insert a + /// vector with default value. + /// + /// If there are columns not in `dest_schema`, an error would be returned. + fn compat_write(&mut self, dest_schema: &SchemaRef) -> Result<()>; +} diff --git a/src/storage/src/write_batch.rs b/src/storage/src/write_batch.rs index 6994603dd7..0bc64c2b86 100644 --- a/src/storage/src/write_batch.rs +++ b/src/storage/src/write_batch.rs @@ -1,3 +1,5 @@ +mod compat; + use std::{ any::Any, collections::{BTreeSet, HashMap}, @@ -7,13 +9,14 @@ use std::{ use common_error::prelude::*; use common_time::{RangeMillis, TimestampMillis}; +use datatypes::schema::{ColumnSchema, SchemaRef}; use datatypes::vectors::TimestampVector; use datatypes::{ arrow::error::ArrowError, data_type::ConcreteDataType, prelude::ScalarVector, prelude::Value, - schema::SchemaRef, vectors::VectorRef, + vectors::VectorRef, }; use prost::{DecodeError, EncodeError}; -use snafu::ensure; +use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::{consts, PutOperation, WriteRequest}; use crate::proto; @@ -127,6 +130,17 @@ pub enum Error { source: proto::write_batch::Error, backtrace: Backtrace, }, + + #[snafu(display( + "Failed to create default value for column {}, source: {}", + name, + source + ))] + CreateDefault { + name: String, + #[snafu(backtrace)] + source: datatypes::error::Error, + }, } pub type Result = std::result::Result; @@ -155,26 +169,16 @@ pub struct WriteBatch { num_rows: usize, } -impl WriteBatch { - pub fn new(schema: SchemaRef) -> Self { - Self { - schema, - mutations: Vec::new(), - num_rows: 0, - } - } -} - impl WriteRequest for WriteBatch { type Error = Error; type PutOp = PutData; - fn put(&mut self, data: PutData) -> Result<()> { + fn put(&mut self, mut data: PutData) -> Result<()> { if data.is_empty() { return Ok(()); } - self.validate_put(&data)?; + self.preprocess_put_data(&mut data)?; self.add_num_rows(data.num_rows())?; self.mutations.push(Mutation::Put(data)); @@ -254,6 +258,14 @@ fn align_timestamp(ts: i64, duration: i64) -> Option { // WriteBatch pub methods. impl WriteBatch { + pub fn new(schema: SchemaRef) -> Self { + Self { + schema, + mutations: Vec::new(), + num_rows: 0, + } + } + pub fn schema(&self) -> &SchemaRef { &self.schema } @@ -267,6 +279,7 @@ impl WriteBatch { } } +/// Enum to wrap different operations. pub enum Mutation { Put(PutData), } @@ -286,6 +299,47 @@ impl PutData { columns: HashMap::with_capacity(num_columns), } } + + fn add_column_by_name(&mut self, name: &str, vector: VectorRef) -> Result<()> { + ensure!( + !self.columns.contains_key(name), + DuplicateColumnSnafu { name } + ); + + if let Some(col) = self.columns.values().next() { + ensure!( + col.len() == vector.len(), + LenNotEqualsSnafu { + name, + expect: col.len(), + given: vector.len(), + } + ); + } + + self.columns.insert(name.to_string(), vector); + + Ok(()) + } + + /// Add columns by its default value. + fn add_default_by_name(&mut self, column_schema: &ColumnSchema) -> Result<()> { + let num_rows = self.num_rows(); + + // If column is not provided, fills it by default value. + let vector = column_schema + .create_default_vector(num_rows) + .context(CreateDefaultSnafu { + name: &column_schema.name, + })? + .context(MissingColumnSnafu { + name: &column_schema.name, + })?; + + validate_column(column_schema, &vector)?; + + self.add_column_by_name(&column_schema.name, vector) + } } impl PutOperation for PutData { @@ -296,7 +350,6 @@ impl PutOperation for PutData { } fn add_version_column(&mut self, vector: VectorRef) -> Result<()> { - // TODO(yingwen): Maybe ensure that version column must be a uint64 vector. self.add_column_by_name(consts::VERSION_COLUMN_NAME, vector) } @@ -349,34 +402,42 @@ impl PutData { } } +fn validate_column(column_schema: &ColumnSchema, col: &VectorRef) -> Result<()> { + if !col.data_type().is_null() { + // This allow us to use NullVector for columns that only have null value. + // TODO(yingwen): Let NullVector supports different logical type so we could + // check data type directly. + ensure!( + col.data_type() == column_schema.data_type, + TypeMismatchSnafu { + name: &column_schema.name, + expect: column_schema.data_type.clone(), + given: col.data_type(), + } + ); + } + + ensure!( + column_schema.is_nullable() || col.null_count() == 0, + HasNullSnafu { + name: &column_schema.name, + } + ); + + Ok(()) +} + impl WriteBatch { - fn validate_put(&self, data: &PutData) -> Result<()> { + /// Validate [PutData] and fill missing columns by default value. + fn preprocess_put_data(&self, data: &mut PutData) -> Result<()> { for column_schema in self.schema.column_schemas() { match data.column_by_name(&column_schema.name) { Some(col) => { - ensure!( - col.data_type() == column_schema.data_type, - TypeMismatchSnafu { - name: &column_schema.name, - expect: column_schema.data_type.clone(), - given: col.data_type(), - } - ); - - ensure!( - column_schema.is_nullable || col.null_count() == 0, - HasNullSnafu { - name: &column_schema.name, - } - ); + validate_column(column_schema, col)?; } None => { - ensure!( - column_schema.is_nullable, - MissingColumnSnafu { - name: &column_schema.name, - } - ); + // If column is not provided, fills it by default value. + data.add_default_by_name(column_schema)?; } } } @@ -412,30 +473,6 @@ impl<'a> IntoIterator for &'a WriteBatch { } } -impl PutData { - fn add_column_by_name(&mut self, name: &str, vector: VectorRef) -> Result<()> { - ensure!( - !self.columns.contains_key(name), - DuplicateColumnSnafu { name } - ); - - if let Some(col) = self.columns.values().next() { - ensure!( - col.len() == vector.len(), - LenNotEqualsSnafu { - name, - expect: col.len(), - given: vector.len(), - } - ); - } - - self.columns.insert(name.to_string(), vector); - - Ok(()) - } -} - pub mod codec { use std::{io::Cursor, sync::Arc}; @@ -1173,8 +1210,7 @@ mod tests { let encoder = codec::WriteBatchProtobufEncoder {}; let mut dst = vec![]; - let result = encoder.encode(&batch, &mut dst); - assert!(result.is_ok()); + encoder.encode(&batch, &mut dst).unwrap(); let decoder = codec::WriteBatchProtobufDecoder::new(mutation_extras); let result = decoder.decode(&dst); diff --git a/src/storage/src/write_batch/compat.rs b/src/storage/src/write_batch/compat.rs new file mode 100644 index 0000000000..a456448009 --- /dev/null +++ b/src/storage/src/write_batch/compat.rs @@ -0,0 +1,199 @@ +use datatypes::schema::{ColumnSchema, SchemaRef}; +use snafu::{ensure, ResultExt}; + +use crate::error::{self, Result}; +use crate::schema::compat::CompatWrite; +use crate::write_batch::{Mutation, PutData, WriteBatch}; + +impl CompatWrite for WriteBatch { + fn compat_write(&mut self, dest_schema: &SchemaRef) -> Result<()> { + let (data_version, schema_version) = (dest_schema.version(), self.schema.version()); + // Fast path, nothing to do if schema version of the write batch is equal to version + // of destination. + if data_version == schema_version { + debug_assert_eq!(dest_schema.column_schemas(), self.schema.column_schemas()); + + return Ok(()); + } + + ensure!( + data_version > schema_version, + error::WriteToOldVersionSnafu { + data_version, + schema_version, + } + ); + + // For columns not in schema, returns error instead of discarding the column silently. + let column_not_in = column_not_in_schema(dest_schema, self.schema.column_schemas()); + ensure!( + column_not_in.is_none(), + error::NotInSchemaToCompatSnafu { + column: column_not_in.unwrap(), + version: data_version, + } + ); + + for m in &mut self.mutations { + match m { + Mutation::Put(put_data) => { + put_data.compat_write(dest_schema)?; + } + } + } + + // Change schema to `dest_schema`. + self.schema = dest_schema.clone(); + + Ok(()) + } +} + +impl CompatWrite for PutData { + fn compat_write(&mut self, dest_schema: &SchemaRef) -> Result<()> { + if self.is_empty() { + return Ok(()); + } + + for column_schema in dest_schema.column_schemas() { + if self.column_by_name(&column_schema.name).is_none() { + // We need to fill the column by null or its default value. + self.add_default_by_name(column_schema) + .context(error::AddDefaultSnafu { + column: &column_schema.name, + })?; + } + } + + Ok(()) + } +} + +fn column_not_in_schema(schema: &SchemaRef, column_schemas: &[ColumnSchema]) -> Option { + column_schemas.iter().find_map(|col| { + if schema.column_schema_by_name(&col.name).is_none() { + Some(col.name.clone()) + } else { + None + } + }) +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use datatypes::data_type::ConcreteDataType; + use datatypes::schema::{ColumnDefaultConstraint, SchemaBuilder}; + use datatypes::vectors::{Int32Vector, TimestampVector}; + use store_api::storage::{PutOperation, WriteRequest}; + + use super::*; + use crate::error::Error; + + fn new_test_schema_builder( + v0_constraint: Option>, + ) -> SchemaBuilder { + let mut column_schemas = vec![ + ColumnSchema::new("k0", ConcreteDataType::int32_datatype(), false), + ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), false), + ]; + + if let Some(v0_constraint) = v0_constraint { + column_schemas.push( + ColumnSchema::new("v0", ConcreteDataType::int32_datatype(), true) + .with_default_constraint(v0_constraint) + .unwrap(), + ); + } + + SchemaBuilder::try_from(column_schemas) + .unwrap() + .timestamp_index(1) + } + + fn new_test_schema(v0_constraint: Option>) -> SchemaRef { + let schema = new_test_schema_builder(v0_constraint).build().unwrap(); + + Arc::new(schema) + } + + fn new_put_data() -> PutData { + let mut put_data = PutData::new(); + let k0 = Arc::new(Int32Vector::from_slice(&[1, 2, 3])); + let ts = Arc::new(TimestampVector::from_values([11, 12, 13])); + + put_data.add_key_column("k0", k0).unwrap(); + put_data.add_key_column("ts", ts).unwrap(); + + put_data + } + + #[test] + fn test_put_data_compat_write() { + let mut put_data = new_put_data(); + let schema = new_test_schema(Some(Some(ColumnDefaultConstraint::null_value()))); + put_data.compat_write(&schema).unwrap(); + let v0 = put_data.column_by_name("v0").unwrap(); + assert!(v0.only_null()); + } + + #[test] + fn test_write_batch_compat_write() { + let schema_old = new_test_schema(None); + let mut batch = WriteBatch::new(schema_old); + let put_data = new_put_data(); + batch.put(put_data).unwrap(); + + let schema_new = Arc::new( + new_test_schema_builder(Some(Some(ColumnDefaultConstraint::null_value()))) + .version(1) + .build() + .unwrap(), + ); + batch.compat_write(&schema_new).unwrap(); + assert_eq!(schema_new, *batch.schema()); + let Mutation::Put(put_data) = batch.iter().next().unwrap(); + put_data.column_by_name("v0").unwrap(); + } + + #[test] + fn test_write_batch_compat_to_old() { + let schema_old = new_test_schema(None); + let schema_new = Arc::new( + new_test_schema_builder(None) + .version(1) // Bump the version + .build() + .unwrap(), + ); + + let mut batch = WriteBatch::new(schema_new); + let err = batch.compat_write(&schema_old).unwrap_err(); + assert!( + matches!(err, Error::WriteToOldVersion { .. }), + "err {} is not WriteToOldVersion", + err + ); + } + + #[test] + fn test_write_batch_skip_compat() { + let schema = new_test_schema(None); + let mut batch = WriteBatch::new(schema.clone()); + batch.compat_write(&schema).unwrap(); + } + + #[test] + fn test_write_batch_compat_columns_not_in_schema() { + let schema_has_column = new_test_schema(Some(None)); + let mut batch = WriteBatch::new(schema_has_column); + + let schema_no_column = Arc::new(new_test_schema_builder(None).version(1).build().unwrap()); + let err = batch.compat_write(&schema_no_column).unwrap_err(); + assert!( + matches!(err, Error::NotInSchemaToCompat { .. }), + "err {} is not NotInSchemaToCompat", + err + ); + } +} diff --git a/src/store-api/src/storage/descriptors.rs b/src/store-api/src/storage/descriptors.rs index a5808f3ef9..40d646c66d 100644 --- a/src/store-api/src/storage/descriptors.rs +++ b/src/store-api/src/storage/descriptors.rs @@ -10,10 +10,9 @@ pub type ColumnFamilyId = u32; /// Id of the region. pub type RegionId = u64; -// TODO(yingwen): Validate default value has same type with column, and name is a valid column name. /// A [ColumnDescriptor] contains information to create a column. #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Builder)] -#[builder(pattern = "owned")] +#[builder(pattern = "owned", build_fn(validate = "Self::validate"))] pub struct ColumnDescriptor { pub id: ColumnId, #[builder(setter(into))] @@ -21,15 +20,27 @@ pub struct ColumnDescriptor { pub data_type: ConcreteDataType, /// Is column nullable, default is true. #[builder(default = "true")] - pub is_nullable: bool, + is_nullable: bool, /// Default constraint of column, default is None, which means no default constraint /// for this column, and user must provide a value for a not-null column. #[builder(default)] - pub default_constraint: Option, + default_constraint: Option, #[builder(default, setter(into))] pub comment: String, } +impl ColumnDescriptor { + #[inline] + pub fn is_nullable(&self) -> bool { + self.is_nullable + } + + #[inline] + pub fn default_constraint(&self) -> Option<&ColumnDefaultConstraint> { + self.default_constraint.as_ref() + } +} + impl ColumnDescriptorBuilder { pub fn new>(id: ColumnId, name: S, data_type: ConcreteDataType) -> Self { Self { @@ -39,12 +50,35 @@ impl ColumnDescriptorBuilder { ..Default::default() } } + + fn validate(&self) -> Result<(), String> { + if let Some(name) = &self.name { + if name.is_empty() { + return Err("name should not be empty".to_string()); + } + } + + if let (Some(Some(constraint)), Some(data_type)) = + (&self.default_constraint, &self.data_type) + { + // The default value of unwrap_or should be same as the default value + // defined in the `#[builder(default = "xxx")]` attribute. + let is_nullable = self.is_nullable.unwrap_or(true); + + constraint + .validate(data_type, is_nullable) + .map_err(|e| e.to_string())?; + } + + Ok(()) + } } impl From<&ColumnDescriptor> for ColumnSchema { fn from(desc: &ColumnDescriptor) -> ColumnSchema { ColumnSchema::new(&desc.name, desc.data_type.clone(), desc.is_nullable) .with_default_constraint(desc.default_constraint.clone()) + .expect("ColumnDescriptor should validate default constraint") } } @@ -139,7 +173,7 @@ mod tests { .is_nullable(false) .build() .unwrap(); - assert!(!desc.is_nullable); + assert!(!desc.is_nullable()); let desc = new_column_desc_builder() .default_constraint(Some(ColumnDefaultConstraint::Value(Value::Null))) @@ -147,7 +181,7 @@ mod tests { .unwrap(); assert_eq!( ColumnDefaultConstraint::Value(Value::Null), - desc.default_constraint.unwrap() + *desc.default_constraint().unwrap() ); let desc = new_column_desc_builder() @@ -164,6 +198,12 @@ mod tests { .build() .unwrap(); assert_eq!("A test column", desc.comment); + + new_column_desc_builder() + .is_nullable(false) + .default_constraint(Some(ColumnDefaultConstraint::Value(Value::Null))) + .build() + .unwrap_err(); } fn new_timestamp_desc() -> ColumnDescriptor { diff --git a/src/store-api/src/storage/requests.rs b/src/store-api/src/storage/requests.rs index cc17ebccbe..aac0b415d6 100644 --- a/src/store-api/src/storage/requests.rs +++ b/src/store-api/src/storage/requests.rs @@ -9,10 +9,14 @@ use datatypes::vectors::VectorRef; use crate::storage::{ColumnDescriptor, RegionDescriptor, SequenceNumber}; /// Write request holds a collection of updates to apply to a region. +/// +/// The implementation of the write request should ensure all operations in +/// the request follows the same schema restriction. pub trait WriteRequest: Send { type Error: ErrorExt + Send + Sync; type PutOp: PutOperation; + /// Add put operation to the request. fn put(&mut self, put: Self::PutOp) -> Result<(), Self::Error>; /// Returns all possible time ranges that contain the timestamp in this batch. @@ -20,8 +24,10 @@ pub trait WriteRequest: Send { /// Each time range is aligned to given `duration`. fn time_ranges(&self, duration: Duration) -> Result, Self::Error>; + /// Create a new put operation. fn put_op(&self) -> Self::PutOp; + /// Create a new put operation with capacity reserved for `num_columns`. fn put_op_with_columns(num_columns: usize) -> Self::PutOp; } @@ -29,10 +35,13 @@ pub trait WriteRequest: Send { pub trait PutOperation: Send + std::fmt::Debug { type Error: ErrorExt + Send + Sync; + /// Put data to the key column. fn add_key_column(&mut self, name: &str, vector: VectorRef) -> Result<(), Self::Error>; + /// Put data to the version column. fn add_version_column(&mut self, vector: VectorRef) -> Result<(), Self::Error>; + /// Put data to the value column. fn add_value_column(&mut self, name: &str, vector: VectorRef) -> Result<(), Self::Error>; } diff --git a/src/table-engine/src/engine.rs b/src/table-engine/src/engine.rs index 9f51b757f6..6d193c146c 100644 --- a/src/table-engine/src/engine.rs +++ b/src/table-engine/src/engine.rs @@ -143,8 +143,8 @@ fn build_row_key_desc( ts_column_schema.name.clone(), ts_column_schema.data_type.clone(), ) - .default_constraint(ts_column_schema.default_constraint.clone()) - .is_nullable(ts_column_schema.is_nullable) + .default_constraint(ts_column_schema.default_constraint().cloned()) + .is_nullable(ts_column_schema.is_nullable()) .build() .context(BuildColumnDescriptorSnafu { column_name: &ts_column_schema.name, @@ -169,8 +169,8 @@ fn build_row_key_desc( column_schema.name.clone(), column_schema.data_type.clone(), ) - .default_constraint(column_schema.default_constraint.clone()) - .is_nullable(column_schema.is_nullable) + .default_constraint(column_schema.default_constraint().cloned()) + .is_nullable(column_schema.is_nullable()) .build() .context(BuildColumnDescriptorSnafu { column_name: &column_schema.name, @@ -212,8 +212,8 @@ fn build_column_family( column_schema.name.clone(), column_schema.data_type.clone(), ) - .default_constraint(column_schema.default_constraint.clone()) - .is_nullable(column_schema.is_nullable) + .default_constraint(column_schema.default_constraint().cloned()) + .is_nullable(column_schema.is_nullable()) .build() .context(BuildColumnDescriptorSnafu { column_name: &column_schema.name, @@ -444,7 +444,8 @@ mod tests { let column_schemas = vec![ ColumnSchema::new("name", ConcreteDataType::string_datatype(), false), ColumnSchema::new("n", ConcreteDataType::int32_datatype(), true) - .with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::from(42i32)))), + .with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::from(42i32)))) + .unwrap(), ColumnSchema::new( "ts", ConcreteDataType::timestamp_datatype(common_time::timestamp::TimeUnit::Millisecond), diff --git a/src/table-engine/src/error.rs b/src/table-engine/src/error.rs index b03054103f..12194591df 100644 --- a/src/table-engine/src/error.rs +++ b/src/table-engine/src/error.rs @@ -173,8 +173,11 @@ pub enum Error { column_qualified_name: String, }, - #[snafu(display("Unsupported column default constraint: {}", expr))] - UnsupportedDefaultConstraint { expr: String, backtrace: Backtrace }, + #[snafu(display("Unsupported column default constraint, source: {}", source))] + UnsupportedDefaultConstraint { + #[snafu(backtrace)] + source: datatypes::error::Error, + }, } impl From for table::error::Error { diff --git a/src/table-engine/src/table.rs b/src/table-engine/src/table.rs index e05d2be875..5ee8aa396f 100644 --- a/src/table-engine/src/table.rs +++ b/src/table-engine/src/table.rs @@ -11,12 +11,8 @@ use common_query::logical_plan::Expr; use common_recordbatch::error::{Error as RecordBatchError, Result as RecordBatchResult}; use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream}; use common_telemetry::logging; -use common_time::util; -use common_time::Timestamp; -use datatypes::data_type::DataType; -use datatypes::prelude::ScalarVector; -use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, SchemaBuilder}; -use datatypes::vectors::{ConstantVector, TimestampVector, VectorRef}; +use datatypes::schema::{ColumnSchema, SchemaBuilder}; +use datatypes::vectors::VectorRef; use futures::task::{Context, Poll}; use futures::Stream; use object_store::ObjectStore; @@ -85,39 +81,35 @@ impl Table for MitoTable { // columns_values is not empty, it's safe to unwrap let rows_num = columns_values.values().next().unwrap().len(); - //Add row key and columns + // Add row key columns for name in key_columns { let column_schema = schema .column_schema_by_name(name) .expect("column schema not found"); - let vector = columns_values.remove(name).or_else(|| { - Self::try_get_column_default_constraint_vector(column_schema, rows_num).ok()? - }); + let vector = match columns_values.remove(name) { + Some(v) => v, + None => Self::try_get_column_default_constraint_vector(column_schema, rows_num)?, + }; - if let Some(vector) = vector { - put_op - .add_key_column(name, vector) - .map_err(TableError::new)?; - } else if !column_schema.is_nullable { - return MissingColumnSnafu { name }.fail()?; - } + put_op + .add_key_column(name, vector) + .map_err(TableError::new)?; } + // Add value columns for name in value_columns { let column_schema = schema .column_schema_by_name(name) .expect("column schema not found"); - let vector = columns_values.remove(name).or_else(|| { - Self::try_get_column_default_constraint_vector(column_schema, rows_num).ok()? - }); - - if let Some(v) = vector { - put_op.add_value_column(name, v).map_err(TableError::new)?; - } else if !column_schema.is_nullable { - return MissingColumnSnafu { name }.fail()?; - } + let vector = match columns_values.remove(name) { + Some(v) => v, + None => Self::try_get_column_default_constraint_vector(column_schema, rows_num)?, + }; + put_op + .add_value_column(name, vector) + .map_err(TableError::new)?; } ensure!( @@ -132,7 +124,7 @@ impl Table for MitoTable { } ); - logging::debug!( + logging::trace!( "Insert into table {} with put_op: {:?}", table_info.name, put_op @@ -220,8 +212,8 @@ impl Table for MitoTable { &new_column.name, new_column.data_type.clone(), ) - .is_nullable(new_column.is_nullable) - .default_constraint(new_column.default_constraint.clone()) + .is_nullable(new_column.is_nullable()) + .default_constraint(new_column.default_constraint().cloned()) .build() .context(error::BuildColumnDescriptorSnafu { table_name, @@ -468,41 +460,16 @@ impl MitoTable { fn try_get_column_default_constraint_vector( column_schema: &ColumnSchema, rows_num: usize, - ) -> TableResult> { + ) -> TableResult { // TODO(dennis): when we support altering schema, we should check the schemas difference between table and region - if let Some(v) = &column_schema.default_constraint { - assert!(rows_num > 0); + let vector = column_schema + .create_default_vector(rows_num) + .context(UnsupportedDefaultConstraintSnafu)? + .context(MissingColumnSnafu { + name: &column_schema.name, + })?; - match v { - ColumnDefaultConstraint::Value(v) => { - let mut mutable_vector = column_schema.data_type.create_mutable_vector(1); - mutable_vector - .push_value_ref(v.as_value_ref()) - .map_err(TableError::new)?; - let vector = - Arc::new(ConstantVector::new(mutable_vector.to_vector(), rows_num)); - Ok(Some(vector)) - } - ColumnDefaultConstraint::Function(expr) => { - match &expr[..] { - // TODO(dennis): we only supports current_timestamp right now, - // it's better to use a expression framework in future. - "current_timestamp()" => { - let vector = - Arc::new(TimestampVector::from_slice(&[Timestamp::from_millis( - util::current_time_millis(), - )])); - Ok(Some(Arc::new(ConstantVector::new(vector, rows_num)))) - } - _ => UnsupportedDefaultConstraintSnafu { expr } - .fail() - .map_err(TableError::new), - } - } - } - } else { - Ok(None) - } + Ok(vector) } pub async fn open( diff --git a/src/table-engine/src/table/test_util/mock_engine.rs b/src/table-engine/src/table/test_util/mock_engine.rs index 0a3a34bb38..6972c27d4b 100644 --- a/src/table-engine/src/table/test_util/mock_engine.rs +++ b/src/table-engine/src/table/test_util/mock_engine.rs @@ -125,6 +125,7 @@ pub struct MockRegionInner { memtable: Arc>, } +/// A columnar memtable, maps column name to data of that column in each row. type MockMemtable = HashMap>; #[async_trait] @@ -189,11 +190,10 @@ impl MockRegionInner { { let mut memtable = self.memtable.write().unwrap(); + // Now drop columns is not supported. let rows = memtable.values().last().unwrap().len(); - - // currently dropping columns are not supported, so we only add columns here for column in metadata.user_schema().column_schemas() { - let _ = memtable + memtable .entry(column.name.clone()) .or_insert_with(|| vec![Value::Null; rows]); } @@ -211,8 +211,6 @@ impl MockRegionInner { let column = memtable.get_mut(name).unwrap(); if let Some(data) = put.column_by_name(name) { (0..data.len()).for_each(|i| column.push(data.get(i))); - } else { - column.extend_from_slice(&vec![Value::Null; put.num_rows()]); } } }