From 2373d676f7ba3d09678d92e87868a2a29f4c9fbb Mon Sep 17 00:00:00 2001 From: "Lei, Huang" <6406592+v0y4g3r@users.noreply.github.com> Date: Tue, 23 Aug 2022 18:04:32 +0800 Subject: [PATCH] feat: add Date type and value (#189) * wip: add Date type and value * fix some cr comments * impl Date values * finish date type * optimize Date value serialization * add some tests * fix some cr comments * add some more test --- Cargo.lock | 6 + .../query/src/logical_plan/accumulator.rs | 2 +- src/common/time/Cargo.toml | 4 + src/common/time/src/date.rs | 111 +++++++++ src/common/time/src/error.rs | 14 ++ src/common/time/src/lib.rs | 2 + src/datanode/Cargo.toml | 3 +- src/datanode/src/sql/create.rs | 37 ++- src/datanode/src/sql/insert.rs | 37 ++- src/datatypes/Cargo.toml | 5 +- src/datatypes/src/data_type.rs | 21 +- src/datatypes/src/scalars.rs | 49 +++- src/datatypes/src/type_id.rs | 3 +- src/datatypes/src/types.rs | 2 + src/datatypes/src/types/date.rs | 34 +++ src/datatypes/src/value.rs | 9 +- src/datatypes/src/vectors.rs | 1 + src/datatypes/src/vectors/builder.rs | 31 +++ src/datatypes/src/vectors/date.rs | 230 ++++++++++++++++++ src/datatypes/src/vectors/helper.rs | 28 +++ src/datatypes/src/vectors/primitive.rs | 5 +- src/servers/src/mysql/writer.rs | 2 +- 22 files changed, 615 insertions(+), 21 deletions(-) create mode 100644 src/common/time/src/date.rs create mode 100644 src/common/time/src/error.rs create mode 100644 src/datatypes/src/types/date.rs create mode 100644 src/datatypes/src/vectors/date.rs diff --git a/Cargo.lock b/Cargo.lock index 24c6a95375..7f7de6eacd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -826,6 +826,10 @@ name = "common-time" version = "0.1.0" dependencies = [ "chrono", + "common-error", + "serde", + "serde_json", + "snafu", ] [[package]] @@ -1241,6 +1245,7 @@ dependencies = [ "common-recordbatch", "common-runtime", "common-telemetry", + "common-time", "datafusion", "datatypes", "hyper", @@ -1272,6 +1277,7 @@ dependencies = [ "arrow2", "common-base", "common-error", + "common-time", "datafusion-common", "enum_dispatch", "num", diff --git a/src/common/query/src/logical_plan/accumulator.rs b/src/common/query/src/logical_plan/accumulator.rs index b1d6f077fb..ee411efd5a 100644 --- a/src/common/query/src/logical_plan/accumulator.rs +++ b/src/common/query/src/logical_plan/accumulator.rs @@ -180,7 +180,7 @@ fn try_into_scalar_value(value: Value, datatype: &ConcreteDataType) -> Result ScalarValue::Float64(Some(v.0)), Value::String(v) => ScalarValue::LargeUtf8(Some(v.as_utf8().to_string())), Value::Binary(v) => ScalarValue::LargeBinary(Some(v.to_vec())), - Value::Date(v) => ScalarValue::Date32(Some(v)), + Value::Date(v) => ScalarValue::Date32(Some(v.val())), Value::DateTime(v) => ScalarValue::Date64(Some(v)), Value::Null => try_convert_null_value(datatype)?, Value::List(list) => try_convert_list_value(list)?, diff --git a/src/common/time/Cargo.toml b/src/common/time/Cargo.toml index 778f857eb7..430d4c3cdc 100644 --- a/src/common/time/Cargo.toml +++ b/src/common/time/Cargo.toml @@ -7,3 +7,7 @@ edition = "2021" [dependencies] chrono = "0.4" +common-error = { path = "../error" } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +snafu = { version = "0.7", features = ["backtraces"] } diff --git a/src/common/time/src/date.rs b/src/common/time/src/date.rs new file mode 100644 index 0000000000..deb4893c7e --- /dev/null +++ b/src/common/time/src/date.rs @@ -0,0 +1,111 @@ +use std::fmt::{Display, Formatter}; +use std::str::FromStr; + +use chrono::{Datelike, NaiveDate}; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use snafu::{ensure, ResultExt}; + +use crate::error::Result; +use crate::error::{DateOverflowSnafu, Error, ParseDateStrSnafu}; + +const UNIX_EPOCH_FROM_CE: i32 = 719_163; + +/// ISO 8601 [Date] values. The inner representation is a signed 32 bit integer that represents the +/// **days since "1970-01-01 00:00:00 UTC" (UNIX Epoch)**. +/// +/// [Date] value ranges between "0000-01-01" to "9999-12-31". +#[derive( + Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default, Deserialize, Serialize, +)] +pub struct Date(i32); + +impl From for Value { + fn from(d: Date) -> Self { + Value::String(d.to_string()) + } +} + +impl FromStr for Date { + type Err = Error; + + fn from_str(s: &str) -> Result { + let date = NaiveDate::parse_from_str(s, "%F").context(ParseDateStrSnafu { raw: s })?; + Ok(Self(date.num_days_from_ce() - UNIX_EPOCH_FROM_CE)) + } +} + +impl Display for Date { + /// [Date] is formatted according to ISO-8601 standard. + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let abs_date = NaiveDate::from_num_days_from_ce(UNIX_EPOCH_FROM_CE + self.0); + f.write_str(&abs_date.format("%F").to_string()) + } +} + +impl Date { + pub fn try_new(val: i32) -> Result { + ensure!( + val >= Self::MIN.0 && val <= Self::MAX.0, + DateOverflowSnafu { value: val } + ); + + Ok(Self(val)) + } + + pub fn val(&self) -> i32 { + self.0 + } + + /// Max valid Date value: "9999-12-31" + pub const MAX: Date = Date(2932896); + /// Min valid Date value: "1000-01-01" + pub const MIN: Date = Date(-354285); +} + +#[cfg(test)] +mod tests { + use chrono::Utc; + + use super::*; + + #[test] + pub fn test_print_date2() { + assert_eq!("1969-12-31", Date::try_new(-1).unwrap().to_string()); + assert_eq!("1970-01-01", Date::try_new(0).unwrap().to_string()); + assert_eq!("1970-02-12", Date::try_new(42).unwrap().to_string()); + } + + #[test] + pub fn test_date_parse() { + assert_eq!( + "1970-01-01", + Date::from_str("1970-01-01").unwrap().to_string() + ); + + assert_eq!( + "1969-01-01", + Date::from_str("1969-01-01").unwrap().to_string() + ); + + let now = Utc::now().date().format("%F").to_string(); + assert_eq!(now, Date::from_str(&now).unwrap().to_string()); + } + + #[test] + pub fn test_illegal_date_values() { + assert!(Date::try_new(Date::MAX.0 + 1).is_err()); + assert!(Date::try_new(Date::MIN.0 - 1).is_err()); + } + + #[test] + pub fn test_edge_date_values() { + let date = Date::from_str("9999-12-31").unwrap(); + assert_eq!(Date::MAX.0, date.0); + assert_eq!(date, Date::try_new(date.0).unwrap()); + + let date = Date::from_str("1000-01-01").unwrap(); + assert_eq!(Date::MIN.0, date.0); + assert_eq!(date, Date::try_new(date.0).unwrap()); + } +} diff --git a/src/common/time/src/error.rs b/src/common/time/src/error.rs new file mode 100644 index 0000000000..371d6aa76d --- /dev/null +++ b/src/common/time/src/error.rs @@ -0,0 +1,14 @@ +use chrono::ParseError; +use snafu::Snafu; + +#[derive(Debug, Snafu)] +#[snafu(visibility(pub))] +pub enum Error { + #[snafu(display("Failed to parse string to date, raw: {}, source: {}", raw, source))] + ParseDateStr { raw: String, source: ParseError }, + + #[snafu(display("Failed to parse i32 value to date: {}", value))] + DateOverflow { value: i32 }, +} + +pub type Result = std::result::Result; diff --git a/src/common/time/src/lib.rs b/src/common/time/src/lib.rs index 23e9871081..43371f84d2 100644 --- a/src/common/time/src/lib.rs +++ b/src/common/time/src/lib.rs @@ -1,3 +1,5 @@ +pub mod date; +pub mod error; pub mod range; pub mod timestamp; pub mod util; diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index 2af95caaa5..b3e85e3a01 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -21,7 +21,8 @@ common-error = { path = "../common/error" } common-recordbatch = { path = "../common/recordbatch" } common-runtime = { path = "../common/runtime" } common-telemetry = { path = "../common/telemetry" } -datatypes = { path = "../datatypes"} +common-time = { path = "../common/time" } +datatypes = { path = "../datatypes" } hyper = { version = "0.14", features = ["full"] } log-store = { path = "../log-store" } metrics = "0.20" diff --git a/src/datanode/src/sql/create.rs b/src/datanode/src/sql/create.rs index b91429a582..c99a7d1de1 100644 --- a/src/datanode/src/sql/create.rs +++ b/src/datanode/src/sql/create.rs @@ -200,7 +200,8 @@ fn sql_data_type_to_concrete_data_type(t: &SqlDataType) -> Result Ok(ConcreteDataType::float32_datatype()), SqlDataType::Double => Ok(ConcreteDataType::float64_datatype()), SqlDataType::Boolean => Ok(ConcreteDataType::boolean_datatype()), - // TODO(hl): Date/DateTime/Timestamp not supported + SqlDataType::Date => Ok(ConcreteDataType::date_datatype()), + // TODO(hl): DateTime not supported _ => SqlTypeNotSupportedSnafu { t: t.clone() }.fail(), } } @@ -353,4 +354,38 @@ mod tests { .data_type ); } + + fn check_type(sql_type: SqlDataType, data_type: ConcreteDataType) { + assert_eq!( + data_type, + sql_data_type_to_concrete_data_type(&sql_type).unwrap() + ); + } + + #[test] + pub fn test_sql_data_type_to_concrete_data_type() { + check_type( + SqlDataType::BigInt(None), + ConcreteDataType::int64_datatype(), + ); + check_type(SqlDataType::Int(None), ConcreteDataType::int32_datatype()); + check_type( + SqlDataType::SmallInt(None), + ConcreteDataType::int16_datatype(), + ); + check_type(SqlDataType::Char(None), ConcreteDataType::string_datatype()); + check_type( + SqlDataType::Varchar(None), + ConcreteDataType::string_datatype(), + ); + check_type(SqlDataType::Text, ConcreteDataType::string_datatype()); + check_type(SqlDataType::String, ConcreteDataType::string_datatype()); + check_type( + SqlDataType::Float(None), + ConcreteDataType::float32_datatype(), + ); + check_type(SqlDataType::Double, ConcreteDataType::float64_datatype()); + check_type(SqlDataType::Boolean, ConcreteDataType::boolean_datatype()); + check_type(SqlDataType::Date, ConcreteDataType::date_datatype()); + } } diff --git a/src/datanode/src/sql/insert.rs b/src/datanode/src/sql/insert.rs index a186b3f487..107fdcb74d 100644 --- a/src/datanode/src/sql/insert.rs +++ b/src/datanode/src/sql/insert.rs @@ -155,13 +155,32 @@ fn parse_sql_value( } ); - s.to_owned().into() + parse_string_to_value(s.to_owned(), data_type)? } _ => todo!("Other sql value"), }) } +fn parse_string_to_value(s: String, data_type: &ConcreteDataType) -> Result { + match data_type { + ConcreteDataType::String(_) => Ok(Value::String(s.into())), + ConcreteDataType::Date(_) => { + if let Ok(date) = common_time::date::Date::from_str(&s) { + Ok(Value::Date(date)) + } else { + ParseSqlValueSnafu { + msg: format!("Failed to parse {} to Date value", s), + } + .fail() + } + } + _ => { + unreachable!() + } + } +} + macro_rules! parse_number_to_value { ($data_type: expr, $n: ident, $(($Type: ident, $PrimitiveType: ident)), +) => { match $data_type { @@ -260,4 +279,20 @@ mod tests { "column_name: \"a\", expect: Float64(Float64), actual: Boolean(BooleanType)" )); } + + #[test] + pub fn test_parse_date_literal() { + let value = parse_sql_value( + "date", + &ConcreteDataType::date_datatype(), + &SqlValue::DoubleQuotedString("2022-02-22".to_string()), + ) + .unwrap(); + assert_eq!(ConcreteDataType::date_datatype(), value.data_type()); + if let Value::Date(d) = value { + assert_eq!("2022-02-22", d.to_string()); + } else { + unreachable!() + } + } } diff --git a/src/datatypes/Cargo.toml b/src/datatypes/Cargo.toml index 2f07ea8166..39ecbf682b 100644 --- a/src/datatypes/Cargo.toml +++ b/src/datatypes/Cargo.toml @@ -10,12 +10,13 @@ features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc [dependencies] common-base = { path = "../common/base" } +common-time = { path = "../common/time" } common-error = { path = "../common/error" } -datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2" } +datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" } enum_dispatch = "0.3" num = "0.4" num-traits = "0.2" -ordered-float = { version = "3.0", features = ["serde"]} +ordered-float = { version = "3.0", features = ["serde"] } paste = "1.0" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" diff --git a/src/datatypes/src/data_type.rs b/src/datatypes/src/data_type.rs index b472155d19..bbd7382473 100644 --- a/src/datatypes/src/data_type.rs +++ b/src/datatypes/src/data_type.rs @@ -7,8 +7,8 @@ use serde::{Deserialize, Serialize}; use crate::error::{self, Error, Result}; use crate::type_id::LogicalTypeId; use crate::types::{ - BinaryType, BooleanType, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, - ListType, NullType, StringType, UInt16Type, UInt32Type, UInt64Type, UInt8Type, + BinaryType, BooleanType, DateType, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, + Int8Type, ListType, NullType, StringType, UInt16Type, UInt32Type, UInt64Type, UInt8Type, }; use crate::value::Value; @@ -34,6 +34,8 @@ pub enum ConcreteDataType { Binary(BinaryType), String(StringType), + Date(DateType), + List(ListType), } @@ -50,7 +52,10 @@ impl ConcreteDataType { } pub fn is_string(&self) -> bool { - matches!(self, ConcreteDataType::String(_)) + matches!( + self, + ConcreteDataType::String(_) | ConcreteDataType::Date(_) + ) } pub fn is_signed(&self) -> bool { @@ -60,6 +65,7 @@ impl ConcreteDataType { | ConcreteDataType::Int16(_) | ConcreteDataType::Int32(_) | ConcreteDataType::Int64(_) + | ConcreteDataType::Date(_) ) } @@ -118,6 +124,7 @@ impl TryFrom<&ArrowDataType> for ConcreteDataType { ArrowDataType::Int64 => Self::int64_datatype(), ArrowDataType::Float32 => Self::float32_datatype(), ArrowDataType::Float64 => Self::float64_datatype(), + ArrowDataType::Date32 => Self::date_datatype(), ArrowDataType::Binary | ArrowDataType::LargeBinary => Self::binary_datatype(), ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 => Self::string_datatype(), ArrowDataType::List(field) => Self::List(ListType::new( @@ -151,7 +158,7 @@ macro_rules! impl_new_concrete_type_functions { impl_new_concrete_type_functions!( Null, Boolean, UInt8, UInt16, UInt32, UInt64, Int8, Int16, Int32, Int64, Float32, Float64, - Binary, String + Binary, String, Date ); impl ConcreteDataType { @@ -264,9 +271,13 @@ mod tests { ConcreteDataType::from_arrow_type(&ArrowDataType::List(Box::new(Field::new( "item", ArrowDataType::Int32, - true + true, )))), ConcreteDataType::List(ListType::new(ConcreteDataType::int32_datatype())) ); + assert!(matches!( + ConcreteDataType::from_arrow_type(&ArrowDataType::Date32), + ConcreteDataType::Date(_) + )); } } diff --git a/src/datatypes/src/scalars.rs b/src/datatypes/src/scalars.rs index b27097bf10..987f4419cc 100644 --- a/src/datatypes/src/scalars.rs +++ b/src/datatypes/src/scalars.rs @@ -1,9 +1,11 @@ use std::any::Any; -pub mod common; use crate::prelude::*; +use crate::vectors::date::DateVector; use crate::vectors::*; +pub mod common; + fn get_iter_capacity>(iter: &I) -> usize { match iter.size_hint() { (_lower, Some(upper)) => upper, @@ -239,8 +241,32 @@ impl<'a> ScalarRef<'a> for &'a [u8] { } } +impl Scalar for common_time::date::Date { + type VectorType = DateVector; + type RefType<'a> = common_time::date::Date; + + fn as_scalar_ref(&self) -> Self::RefType<'_> { + *self + } + + fn upcast_gat<'short, 'long: 'short>(long: Self::RefType<'long>) -> Self::RefType<'short> { + long + } +} + +impl<'a> ScalarRef<'a> for common_time::date::Date { + type VectorType = DateVector; + type ScalarType = common_time::date::Date; + + fn to_owned_scalar(&self) -> Self::ScalarType { + *self + } +} + #[cfg(test)] mod tests { + use common_time::date::Date; + use super::*; use crate::vectors::binary::BinaryVector; use crate::vectors::primitive::Int32Vector; @@ -282,4 +308,25 @@ mod tests { let vector: BinaryVector = build_vector_from_slice(&expect); assert_vector_eq(&expect, &vector); } + + #[test] + pub fn test_build_date_vector() { + let expect: Vec> = vec![ + Some(Date::try_new(0).unwrap()), + Some(Date::try_new(-1).unwrap()), + Some(Date::try_new(1).unwrap()), + None, + Some(Date::MAX), + Some(Date::MIN), + ]; + let vector: DateVector = build_vector_from_slice(&expect); + assert_vector_eq(&expect, &vector); + } + + #[test] + pub fn test_date_scalar() { + let date = Date::try_new(1).unwrap(); + assert_eq!(date, date.as_scalar_ref()); + assert_eq!(date, date.to_owned_scalar()); + } } diff --git a/src/datatypes/src/type_id.rs b/src/datatypes/src/type_id.rs index 4372afe1bb..2d008da11e 100644 --- a/src/datatypes/src/type_id.rs +++ b/src/datatypes/src/type_id.rs @@ -52,7 +52,8 @@ impl LogicalTypeId { LogicalTypeId::Float64 => ConcreteDataType::float64_datatype(), LogicalTypeId::String => ConcreteDataType::string_datatype(), LogicalTypeId::Binary => ConcreteDataType::binary_datatype(), - LogicalTypeId::Date | LogicalTypeId::DateTime | LogicalTypeId::List => { + LogicalTypeId::Date => ConcreteDataType::date_datatype(), + LogicalTypeId::DateTime | LogicalTypeId::List => { unimplemented!("Data type for {:?} is unimplemented", self) } } diff --git a/src/datatypes/src/types.rs b/src/datatypes/src/types.rs index efa3224423..eb53902f78 100644 --- a/src/datatypes/src/types.rs +++ b/src/datatypes/src/types.rs @@ -1,5 +1,6 @@ mod binary_type; mod boolean_type; +mod date; mod list_type; mod null_type; mod primitive_traits; @@ -8,6 +9,7 @@ mod string_type; pub use binary_type::BinaryType; pub use boolean_type::BooleanType; +pub use date::DateType; pub use list_type::ListType; pub use null_type::NullType; pub use primitive_traits::Primitive; diff --git a/src/datatypes/src/types/date.rs b/src/datatypes/src/types/date.rs new file mode 100644 index 0000000000..b362d30799 --- /dev/null +++ b/src/datatypes/src/types/date.rs @@ -0,0 +1,34 @@ +use std::sync::Arc; + +use arrow::datatypes::DataType as ArrowDataType; +use serde::{Deserialize, Serialize}; + +use crate::data_type::DataType; +use crate::prelude::{DataTypeRef, LogicalTypeId, Value}; + +#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)] +pub struct DateType; + +impl DataType for DateType { + fn name(&self) -> &str { + "Date" + } + + fn logical_type_id(&self) -> LogicalTypeId { + LogicalTypeId::Date + } + + fn default_value(&self) -> Value { + Value::Date(Default::default()) + } + + fn as_arrow_type(&self) -> ArrowDataType { + ArrowDataType::Date32 + } +} + +impl DateType { + pub fn arc() -> DataTypeRef { + Arc::new(Self) + } +} diff --git a/src/datatypes/src/value.rs b/src/datatypes/src/value.rs index 80248980f9..d76acb18b0 100644 --- a/src/datatypes/src/value.rs +++ b/src/datatypes/src/value.rs @@ -36,7 +36,7 @@ pub enum Value { Binary(Bytes), // Date & Time types: - Date(i32), + Date(common_time::date::Date), DateTime(i64), List(ListValue), @@ -64,7 +64,8 @@ impl Value { Value::String(_) => ConcreteDataType::string_datatype(), Value::Binary(_) => ConcreteDataType::binary_datatype(), Value::List(list) => ConcreteDataType::list_datatype(list.datatype().clone()), - Value::Date(_) | Value::DateTime(_) => { + Value::Date(_) => ConcreteDataType::date_datatype(), + Value::DateTime(_) => { unimplemented!("Unsupported data type of value {:?}", self) } } @@ -151,7 +152,7 @@ impl TryFrom for serde_json::Value { Value::Float64(v) => serde_json::Value::from(v.0), Value::String(bytes) => serde_json::Value::String(bytes.as_utf8().to_string()), Value::Binary(bytes) => serde_json::to_value(bytes)?, - Value::Date(v) => serde_json::Value::Number(v.into()), + Value::Date(v) => serde_json::Value::Number(v.val().into()), Value::DateTime(v) => serde_json::Value::Number(v.into()), Value::List(v) => serde_json::to_value(v)?, }; @@ -405,7 +406,7 @@ mod tests { ); assert_eq!( serde_json::Value::Number(5000i32.into()), - to_json(Value::Date(5000)) + to_json(Value::Date(common_time::date::Date::try_new(5000).unwrap())) ); assert_eq!( serde_json::Value::Number(5000i64.into()), diff --git a/src/datatypes/src/vectors.rs b/src/datatypes/src/vectors.rs index f7b249fd44..d451ccf4e2 100644 --- a/src/datatypes/src/vectors.rs +++ b/src/datatypes/src/vectors.rs @@ -2,6 +2,7 @@ pub mod binary; pub mod boolean; mod builder; pub mod constant; +pub mod date; mod helper; mod list; pub mod mutable; diff --git a/src/datatypes/src/vectors/builder.rs b/src/datatypes/src/vectors/builder.rs index a5911652a4..c4983f9149 100644 --- a/src/datatypes/src/vectors/builder.rs +++ b/src/datatypes/src/vectors/builder.rs @@ -1,8 +1,11 @@ use std::sync::Arc; +use common_time::date::Date; + use crate::data_type::ConcreteDataType; use crate::scalars::ScalarVectorBuilder; use crate::value::Value; +use crate::vectors::date::DateVectorBuilder; use crate::vectors::{ BinaryVectorBuilder, BooleanVectorBuilder, Float32VectorBuilder, Float64VectorBuilder, Int16VectorBuilder, Int32VectorBuilder, Int64VectorBuilder, Int8VectorBuilder, MutableVector, @@ -29,6 +32,8 @@ pub enum VectorBuilder { // String types: String(StringVectorBuilder), Binary(BinaryVectorBuilder), + + Date(DateVectorBuilder), } impl VectorBuilder { @@ -78,6 +83,9 @@ impl VectorBuilder { ConcreteDataType::Binary(_) => { VectorBuilder::Binary(BinaryVectorBuilder::with_capacity(capacity)) } + ConcreteDataType::Date(_) => { + VectorBuilder::Date(DateVectorBuilder::with_capacity(capacity)) + } _ => unimplemented!(), } } @@ -98,6 +106,7 @@ impl VectorBuilder { VectorBuilder::Float64(b) => b.data_type(), VectorBuilder::String(b) => b.data_type(), VectorBuilder::Binary(b) => b.data_type(), + VectorBuilder::Date(b) => b.data_type(), } } @@ -121,6 +130,8 @@ impl VectorBuilder { (VectorBuilder::Float64(b), Value::Float64(v)) => b.push(Some(v.into_inner())), (VectorBuilder::String(b), Value::String(v)) => b.push(Some(v.as_utf8())), (VectorBuilder::Binary(b), Value::Binary(v)) => b.push(Some(v)), + (VectorBuilder::Date(b), Value::Date(v)) => b.push(Some(*v)), + (VectorBuilder::Date(b), Value::Int32(v)) => b.push(Some(Date::try_new(*v).unwrap())), _ => panic!( "Value {:?} does not match builder type {:?}", value, @@ -145,6 +156,7 @@ impl VectorBuilder { VectorBuilder::Float64(b) => b.push(None), VectorBuilder::String(b) => b.push(None), VectorBuilder::Binary(b) => b.push(None), + VectorBuilder::Date(b) => b.push(None), } } @@ -164,6 +176,7 @@ impl VectorBuilder { VectorBuilder::Float64(b) => Arc::new(b.finish()), VectorBuilder::String(b) => Arc::new(b.finish()), VectorBuilder::Binary(b) => Arc::new(b.finish()), + VectorBuilder::Date(b) => Arc::new(b.finish()), } } } @@ -173,6 +186,8 @@ mod tests { use ordered_float::OrderedFloat; use super::*; + use crate::prelude::Vector; + use crate::vectors::date::DateVector; macro_rules! impl_integer_builder_test { ($Type: ident, $datatype: ident) => { @@ -257,4 +272,20 @@ mod tests { let vector = builder.finish(); assert_eq!(Value::String(hello.into()), vector.get(0)); } + + #[test] + pub fn test_date_vector_builder() { + let mut builder = VectorBuilder::with_capacity(ConcreteDataType::date_datatype(), 3); + assert_eq!(ConcreteDataType::date_datatype(), builder.data_type()); + builder.push_null(); + builder.push(&Value::Date(Date::try_new(123).unwrap())); + let v = builder.finish(); + let v = v.as_any().downcast_ref::().unwrap(); + assert_eq!(Value::Null, v.get(0)); + assert_eq!(Value::Date(Date::try_new(123).unwrap()), v.get(1)); + assert_eq!( + &arrow::datatypes::DataType::Date32, + v.to_arrow_array().data_type() + ); + } } diff --git a/src/datatypes/src/vectors/date.rs b/src/datatypes/src/vectors/date.rs new file mode 100644 index 0000000000..37cfb5998f --- /dev/null +++ b/src/datatypes/src/vectors/date.rs @@ -0,0 +1,230 @@ +use std::any::Any; +use std::sync::Arc; + +use arrow::array::{Array, ArrayRef, PrimitiveArray}; +use common_time::date::Date; +use snafu::OptionExt; + +use crate::data_type::ConcreteDataType; +use crate::error::ConversionSnafu; +use crate::prelude::{ScalarVectorBuilder, Validity, Value, Vector, VectorRef}; +use crate::scalars::ScalarVector; +use crate::serialize::Serializable; +use crate::vectors::{MutableVector, PrimitiveIter, PrimitiveVector, PrimitiveVectorBuilder}; + +#[derive(Debug, Clone)] +pub struct DateVector { + array: PrimitiveVector, +} + +impl DateVector { + pub fn new(array: PrimitiveArray) -> Self { + Self { + array: PrimitiveVector { array }, + } + } + + pub fn try_from_arrow_array(array: impl AsRef) -> crate::error::Result { + Ok(Self::new( + array + .as_ref() + .as_any() + .downcast_ref::>() + .with_context(|| ConversionSnafu { + from: format!("{:?}", array.as_ref().data_type()), + })? + .clone(), + )) + } +} + +impl Vector for DateVector { + fn data_type(&self) -> ConcreteDataType { + ConcreteDataType::date_datatype() + } + + fn vector_type_name(&self) -> String { + "DateVector".to_string() + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn len(&self) -> usize { + self.array.len() + } + + fn to_arrow_array(&self) -> ArrayRef { + let validity = self.array.array.validity().cloned(); + let buffer = self.array.array.values().clone(); + Arc::new(PrimitiveArray::new( + arrow::datatypes::DataType::Date32, + buffer, + validity, + )) + } + + fn validity(&self) -> Validity { + self.array.validity() + } + + fn memory_size(&self) -> usize { + self.array.memory_size() + } + + fn is_null(&self, row: usize) -> bool { + self.array.is_null(row) + } + + fn slice(&self, offset: usize, length: usize) -> VectorRef { + self.array.slice(offset, length) + } + + fn get(&self, index: usize) -> Value { + match self.array.get(index) { + Value::Int32(v) => { + Value::Date(Date::try_new(v).expect("Not expected to overflow here")) + } + Value::Null => Value::Null, + _ => { + unreachable!() + } + } + } + + fn replicate(&self, offsets: &[usize]) -> VectorRef { + self.array.replicate(offsets) + } +} + +impl From>> for DateVector { + fn from(data: Vec>) -> Self { + Self { + array: PrimitiveVector::::from(data), + } + } +} + +pub struct DateIter<'a> { + iter: PrimitiveIter<'a, i32>, +} + +impl<'a> Iterator for DateIter<'a> { + type Item = Option; + + fn next(&mut self) -> Option { + self.iter + .next() + .map(|v| v.map(|v| Date::try_new(v).unwrap())) + } +} + +impl ScalarVector for DateVector { + type OwnedItem = Date; + type RefItem<'a> = Date; + type Iter<'a> = DateIter<'a>; + + type Builder = DateVectorBuilder; + + fn get_data(&self, idx: usize) -> Option> { + self.array.get_data(idx).map(|v| Date::try_new(v).unwrap()) + } + + fn iter_data(&self) -> Self::Iter<'_> { + DateIter { + iter: self.array.iter_data(), + } + } +} + +impl Serializable for DateVector { + fn serialize_to_json(&self) -> crate::error::Result> { + Ok(self + .array + .iter_data() + .map(|v| v.map(|d| Date::try_new(d).unwrap())) + .map(|v| match v { + None => serde_json::Value::Null, + Some(v) => v.into(), + }) + .collect::>()) + } +} + +pub struct DateVectorBuilder { + buffer: PrimitiveVectorBuilder, +} + +impl MutableVector for DateVectorBuilder { + fn data_type(&self) -> ConcreteDataType { + ConcreteDataType::date_datatype() + } + + fn len(&self) -> usize { + self.buffer.len() + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn as_mut_any(&mut self) -> &mut dyn Any { + self + } + + fn to_vector(&mut self) -> VectorRef { + Arc::new(self.finish()) + } +} + +impl ScalarVectorBuilder for DateVectorBuilder { + type VectorType = DateVector; + + fn with_capacity(capacity: usize) -> Self { + Self { + buffer: PrimitiveVectorBuilder::with_capacity(capacity), + } + } + + fn push(&mut self, value: Option<::RefItem<'_>>) { + self.buffer.push(value.map(|d| d.val())) + } + + fn finish(&mut self) -> Self::VectorType { + Self::VectorType { + array: self.buffer.finish(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + pub fn test_build_date_vector() { + let mut builder = DateVectorBuilder::with_capacity(4); + builder.push(Some(Date::try_new(1).unwrap())); + builder.push(None); + builder.push(Some(Date::try_new(-1).unwrap())); + let vector = builder.finish(); + assert_eq!(3, vector.len()); + assert_eq!(Some(Date::try_new(1).unwrap()), vector.get_data(0)); + assert_eq!(None, vector.get_data(1)); + assert_eq!(Some(Date::try_new(-1).unwrap()), vector.get_data(2)); + let mut iter = vector.iter_data(); + assert_eq!(Some(Date::try_new(1).unwrap()), iter.next().unwrap()); + assert_eq!(None, iter.next().unwrap()); + assert_eq!(Some(Date::try_new(-1).unwrap()), iter.next().unwrap()); + } + + #[test] + pub fn test_date_scalar() { + let vector = + DateVector::from_slice(&[Date::try_new(1).unwrap(), Date::try_new(2).unwrap()]); + assert_eq!(2, vector.len()); + assert_eq!(Some(Date::try_new(1).unwrap()), vector.get_data(0)); + assert_eq!(Some(Date::try_new(2).unwrap()), vector.get_data(1)); + } +} diff --git a/src/datatypes/src/vectors/helper.rs b/src/datatypes/src/vectors/helper.rs index dbd7607f9e..191f138ef6 100644 --- a/src/datatypes/src/vectors/helper.rs +++ b/src/datatypes/src/vectors/helper.rs @@ -10,6 +10,7 @@ use snafu::OptionExt; use crate::error::{ConversionSnafu, Result, UnknownVectorSnafu}; use crate::scalars::*; +use crate::vectors::date::DateVector; use crate::vectors::*; pub struct Helper; @@ -137,6 +138,9 @@ impl Helper { ScalarValue::LargeBinary(v) => { ConstantVector::new(Arc::new(BinaryVector::from(vec![v])), length) } + ScalarValue::Date32(v) => { + ConstantVector::new(Arc::new(DateVector::from(vec![v])), length) + } _ => { return ConversionSnafu { from: format!("Unsupported scalar value: {}", value), @@ -172,6 +176,7 @@ impl Helper { ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 => { Arc::new(StringVector::try_from_arrow_array(array)?) } + ArrowDataType::Date32 => Arc::new(DateVector::try_from_arrow_array(array)?), ArrowDataType::List(_) => Arc::new(ListVector::try_from_arrow_array(array)?), _ => unimplemented!("Arrow array datatype: {:?}", array.as_ref().data_type()), }) @@ -185,6 +190,7 @@ impl Helper { #[cfg(test)] mod tests { use arrow::array::Int32Array; + use common_time::date::Date; use super::*; @@ -203,4 +209,26 @@ mod tests { assert_eq!(Value::Int32(2), vectors[1].get(0)); assert_eq!(Value::Int32(3), vectors[2].get(0)); } + + #[test] + pub fn test_try_into_date_vector() { + let vector = DateVector::from(vec![Some(1), Some(2), None]); + let arrow_array = vector.to_arrow_array(); + assert_eq!(&arrow::datatypes::DataType::Date32, arrow_array.data_type()); + let vector_converted = Helper::try_into_vector(arrow_array).unwrap(); + assert_eq!(vector.len(), vector_converted.len()); + for i in 0..vector_converted.len() { + assert_eq!(vector.get(i), vector_converted.get(i)); + } + } + + #[test] + pub fn test_try_from_scalar_date_value() { + let vector = Helper::try_from_scalar_value(ScalarValue::Date32(Some(42)), 3).unwrap(); + assert_eq!(ConcreteDataType::date_datatype(), vector.data_type()); + assert_eq!(3, vector.len()); + for i in 0..vector.len() { + assert_eq!(Value::Date(Date::try_new(42).unwrap()), vector.get(i)); + } + } } diff --git a/src/datatypes/src/vectors/primitive.rs b/src/datatypes/src/vectors/primitive.rs index 03b027f39e..851d16ac29 100644 --- a/src/datatypes/src/vectors/primitive.rs +++ b/src/datatypes/src/vectors/primitive.rs @@ -21,7 +21,7 @@ use crate::vectors::{self, MutableVector, Validity, Vector, VectorRef}; /// Vector for primitive data types. #[derive(Debug, Clone)] pub struct PrimitiveVector { - array: PrimitiveArray, + pub(crate) array: PrimitiveArray, } impl PrimitiveVector { @@ -204,7 +204,7 @@ impl<'a, T: Copy> Iterator for PrimitiveIter<'a, T> { } pub struct PrimitiveVectorBuilder { - mutable_array: MutablePrimitiveArray, + pub(crate) mutable_array: MutablePrimitiveArray, } impl PrimitiveVectorBuilder { @@ -289,7 +289,6 @@ impl Serializable for PrimitiveVector { #[cfg(test)] mod tests { - use arrow::datatypes::DataType as ArrowDataType; use serde_json; diff --git a/src/servers/src/mysql/writer.rs b/src/servers/src/mysql/writer.rs index 0b21561da4..cc973d1735 100644 --- a/src/servers/src/mysql/writer.rs +++ b/src/servers/src/mysql/writer.rs @@ -101,7 +101,7 @@ impl<'a, W: io::Write> MysqlResultWriter<'a, W> { Value::Float64(v) => row_writer.write_col(v.0)?, Value::String(v) => row_writer.write_col(v.as_utf8())?, Value::Binary(v) => row_writer.write_col(v.deref())?, - Value::Date(v) => row_writer.write_col(v)?, + Value::Date(v) => row_writer.write_col(v.val())?, Value::DateTime(v) => row_writer.write_col(v)?, Value::List(_) => { return Err(Error::Internal {