diff --git a/src/datatypes/src/data_type.rs b/src/datatypes/src/data_type.rs index 003c0ed14a..93d9450631 100644 --- a/src/datatypes/src/data_type.rs +++ b/src/datatypes/src/data_type.rs @@ -12,6 +12,7 @@ use crate::types::{ Int8Type, ListType, NullType, StringType, UInt16Type, UInt32Type, UInt64Type, UInt8Type, }; use crate::value::Value; +use crate::vectors::MutableVector; #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[enum_dispatch::enum_dispatch(DataType)] @@ -185,6 +186,9 @@ pub trait DataType: std::fmt::Debug + Send + Sync { /// Convert this type as [arrow2::datatypes::DataType]. fn as_arrow_type(&self) -> ArrowDataType; + + /// Create a mutable vector with given `capacity` of this type. + fn create_mutable_vector(&self, capacity: usize) -> Box; } pub type DataTypeRef = Arc; diff --git a/src/datatypes/src/error.rs b/src/datatypes/src/error.rs index 61403d1481..d67cfcf9a5 100644 --- a/src/datatypes/src/error.rs +++ b/src/datatypes/src/error.rs @@ -47,6 +47,9 @@ pub enum Error { #[snafu(display("Invalid timestamp index: {}", index))] InvalidTimestampIndex { index: usize, backtrace: Backtrace }, + + #[snafu(display("{}", msg))] + CastType { msg: String, backtrace: Backtrace }, } impl ErrorExt for Error { diff --git a/src/datatypes/src/prelude.rs b/src/datatypes/src/prelude.rs index 88e1505fb9..d3e405204f 100644 --- a/src/datatypes/src/prelude.rs +++ b/src/datatypes/src/prelude.rs @@ -3,7 +3,7 @@ pub use crate::macros::*; pub use crate::scalars::{Scalar, ScalarRef, ScalarVector, ScalarVectorBuilder}; pub use crate::type_id::LogicalTypeId; pub use crate::types::Primitive; -pub use crate::value::Value; +pub use crate::value::{Value, ValueRef}; pub use crate::vectors::{ Helper as VectorHelper, MutableVector, Validity, Vector, VectorBuilder, VectorRef, }; diff --git a/src/datatypes/src/type_id.rs b/src/datatypes/src/type_id.rs index 4d620d75df..f663f614e9 100644 --- a/src/datatypes/src/type_id.rs +++ b/src/datatypes/src/type_id.rs @@ -53,8 +53,9 @@ impl LogicalTypeId { LogicalTypeId::String => ConcreteDataType::string_datatype(), LogicalTypeId::Binary => ConcreteDataType::binary_datatype(), LogicalTypeId::Date => ConcreteDataType::date_datatype(), - LogicalTypeId::DateTime | LogicalTypeId::List => { - unimplemented!("Data type for {:?} is unimplemented", self) + LogicalTypeId::DateTime => ConcreteDataType::datetime_datatype(), + LogicalTypeId::List => { + ConcreteDataType::list_datatype(ConcreteDataType::null_datatype()) } } } diff --git a/src/datatypes/src/types.rs b/src/datatypes/src/types.rs index 764d7e963f..f3ef94306f 100644 --- a/src/datatypes/src/types.rs +++ b/src/datatypes/src/types.rs @@ -16,7 +16,7 @@ pub use list_type::ListType; pub use null_type::NullType; pub use primitive_traits::Primitive; pub use primitive_type::{ - DataTypeBuilder, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, + Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, PrimitiveElement, PrimitiveType, UInt16Type, UInt32Type, UInt64Type, UInt8Type, }; pub use string_type::StringType; diff --git a/src/datatypes/src/types/binary_type.rs b/src/datatypes/src/types/binary_type.rs index e7f5625cb7..5da23b1ab1 100644 --- a/src/datatypes/src/types/binary_type.rs +++ b/src/datatypes/src/types/binary_type.rs @@ -5,8 +5,10 @@ use common_base::bytes::StringBytes; use serde::{Deserialize, Serialize}; use crate::data_type::{DataType, DataTypeRef}; +use crate::scalars::ScalarVectorBuilder; use crate::type_id::LogicalTypeId; use crate::value::Value; +use crate::vectors::{BinaryVectorBuilder, MutableVector}; #[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct BinaryType; @@ -33,4 +35,8 @@ impl DataType for BinaryType { fn as_arrow_type(&self) -> ArrowDataType { ArrowDataType::LargeBinary } + + fn create_mutable_vector(&self, capacity: usize) -> Box { + Box::new(BinaryVectorBuilder::with_capacity(capacity)) + } } diff --git a/src/datatypes/src/types/boolean_type.rs b/src/datatypes/src/types/boolean_type.rs index 9ea1a01882..dea059662b 100644 --- a/src/datatypes/src/types/boolean_type.rs +++ b/src/datatypes/src/types/boolean_type.rs @@ -4,8 +4,10 @@ use arrow::datatypes::DataType as ArrowDataType; use serde::{Deserialize, Serialize}; use crate::data_type::{DataType, DataTypeRef}; +use crate::scalars::ScalarVectorBuilder; use crate::type_id::LogicalTypeId; use crate::value::Value; +use crate::vectors::{BooleanVectorBuilder, MutableVector}; #[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct BooleanType; @@ -32,4 +34,8 @@ impl DataType for BooleanType { fn as_arrow_type(&self) -> ArrowDataType { ArrowDataType::Boolean } + + fn create_mutable_vector(&self, capacity: usize) -> Box { + Box::new(BooleanVectorBuilder::with_capacity(capacity)) + } } diff --git a/src/datatypes/src/types/date.rs b/src/datatypes/src/types/date.rs index 3f306b07b6..d2f57632bf 100644 --- a/src/datatypes/src/types/date.rs +++ b/src/datatypes/src/types/date.rs @@ -5,6 +5,8 @@ use serde::{Deserialize, Serialize}; use crate::data_type::DataType; use crate::prelude::{DataTypeRef, LogicalTypeId, Value}; +use crate::scalars::ScalarVectorBuilder; +use crate::vectors::{DateVectorBuilder, MutableVector}; #[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct DateType; @@ -25,6 +27,10 @@ impl DataType for DateType { fn as_arrow_type(&self) -> ArrowDataType { ArrowDataType::Date32 } + + fn create_mutable_vector(&self, capacity: usize) -> Box { + Box::new(DateVectorBuilder::with_capacity(capacity)) + } } impl DateType { diff --git a/src/datatypes/src/types/datetime.rs b/src/datatypes/src/types/datetime.rs index 71ecc3ff66..d12260f366 100644 --- a/src/datatypes/src/types/datetime.rs +++ b/src/datatypes/src/types/datetime.rs @@ -5,6 +5,8 @@ use serde::{Deserialize, Serialize}; use crate::data_type::{DataType, DataTypeRef}; use crate::prelude::{LogicalTypeId, Value}; +use crate::scalars::ScalarVectorBuilder; +use crate::vectors::{DateTimeVectorBuilder, MutableVector}; #[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct DateTimeType; @@ -28,6 +30,10 @@ impl DataType for DateTimeType { fn as_arrow_type(&self) -> ArrowDataType { ArrowDataType::Date64 } + + fn create_mutable_vector(&self, capacity: usize) -> Box { + Box::new(DateTimeVectorBuilder::with_capacity(capacity)) + } } impl DateTimeType { diff --git a/src/datatypes/src/types/list_type.rs b/src/datatypes/src/types/list_type.rs index e3afa6aafa..eccc49c6d5 100644 --- a/src/datatypes/src/types/list_type.rs +++ b/src/datatypes/src/types/list_type.rs @@ -3,6 +3,7 @@ use serde::{Deserialize, Serialize}; use crate::prelude::*; use crate::value::ListValue; +use crate::vectors::{ListVectorBuilder, MutableVector}; /// Used to represent the List datatype. #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] @@ -42,6 +43,13 @@ impl DataType for ListType { let field = Box::new(Field::new("item", self.inner.as_arrow_type(), true)); ArrowDataType::List(field) } + + fn create_mutable_vector(&self, capacity: usize) -> Box { + Box::new(ListVectorBuilder::with_capacity( + *self.inner.clone(), + capacity, + )) + } } #[cfg(test)] diff --git a/src/datatypes/src/types/null_type.rs b/src/datatypes/src/types/null_type.rs index 2d9c9c5092..9b18590515 100644 --- a/src/datatypes/src/types/null_type.rs +++ b/src/datatypes/src/types/null_type.rs @@ -6,6 +6,7 @@ use serde::{Deserialize, Serialize}; use crate::data_type::{DataType, DataTypeRef}; use crate::type_id::LogicalTypeId; use crate::value::Value; +use crate::vectors::{MutableVector, NullVectorBuilder}; #[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct NullType; @@ -32,4 +33,8 @@ impl DataType for NullType { fn as_arrow_type(&self) -> ArrowDataType { ArrowDataType::Null } + + fn create_mutable_vector(&self, _capacity: usize) -> Box { + Box::new(NullVectorBuilder::default()) + } } diff --git a/src/datatypes/src/types/primitive_traits.rs b/src/datatypes/src/types/primitive_traits.rs index cfd11f3d06..be031fe1ca 100644 --- a/src/datatypes/src/types/primitive_traits.rs +++ b/src/datatypes/src/types/primitive_traits.rs @@ -3,7 +3,7 @@ use arrow::types::NativeType; use num::NumCast; use crate::prelude::Scalar; -use crate::value::Value; +use crate::value::{IntoValueRef, Value}; /// Primitive type. pub trait Primitive: @@ -12,6 +12,7 @@ pub trait Primitive: + Clone + Copy + Into + + IntoValueRef<'static> + NativeType + serde::Serialize + NativeArithmetics diff --git a/src/datatypes/src/types/primitive_type.rs b/src/datatypes/src/types/primitive_type.rs index f355f696ff..afce150fd8 100644 --- a/src/datatypes/src/types/primitive_type.rs +++ b/src/datatypes/src/types/primitive_type.rs @@ -1,14 +1,19 @@ use std::any::TypeId; use std::marker::PhantomData; +use arrow::array::PrimitiveArray; use arrow::datatypes::DataType as ArrowDataType; use paste::paste; use serde::{Deserialize, Serialize}; +use snafu::OptionExt; use crate::data_type::{ConcreteDataType, DataType}; +use crate::error::{self, Result}; +use crate::scalars::ScalarVectorBuilder; use crate::type_id::LogicalTypeId; use crate::types::primitive_traits::Primitive; -use crate::value::Value; +use crate::value::{Value, ValueRef}; +use crate::vectors::{MutableVector, PrimitiveVector, PrimitiveVectorBuilder, Vector}; #[derive(Clone, Serialize, Deserialize)] pub struct PrimitiveType { @@ -24,22 +29,60 @@ impl PartialEq> for PrimitiveType Eq for PrimitiveType {} -/// Create a new [ConcreteDataType] from a primitive type. -pub trait DataTypeBuilder { +/// A trait that provide helper methods for a primitive type to implementing the [PrimitiveVector]. +pub trait PrimitiveElement: Primitive { + /// Construct the data type struct. fn build_data_type() -> ConcreteDataType; + + /// Returns the name of the type id. fn type_name() -> String; + + /// Dynamic cast the vector to the concrete vector type. + fn cast_vector(vector: &dyn Vector) -> Result<&PrimitiveArray>; + + /// Cast value ref to the primitive type. + fn cast_value_ref(value: ValueRef) -> Result>; } -macro_rules! impl_build_data_type { +macro_rules! impl_primitive_element { ($Type:ident, $TypeId:ident) => { paste::paste! { - impl DataTypeBuilder for $Type { + impl PrimitiveElement for $Type { fn build_data_type() -> ConcreteDataType { ConcreteDataType::$TypeId(PrimitiveType::<$Type>::default()) } + fn type_name() -> String { stringify!($TypeId).to_string() } + + fn cast_vector(vector: &dyn Vector) -> Result<&PrimitiveArray<$Type>> { + let primitive_vector = vector + .as_any() + .downcast_ref::>() + .with_context(|| error::CastTypeSnafu { + msg: format!( + "Failed to cast {} to vector of primitive type {}", + vector.vector_type_name(), + stringify!($TypeId) + ), + })?; + Ok(&primitive_vector.array) + } + + fn cast_value_ref(value: ValueRef) -> Result> { + match value { + ValueRef::Null => Ok(None), + ValueRef::$TypeId(v) => Ok(Some(v.into())), + other => error::CastTypeSnafu { + msg: format!( + "Failed to cast value {:?} to primitive type {}", + other, + stringify!($TypeId), + ), + }.fail(), + } + } } } }; @@ -63,6 +106,10 @@ macro_rules! impl_numeric { fn as_arrow_type(&self) -> ArrowDataType { ArrowDataType::$TypeId } + + fn create_mutable_vector(&self, capacity: usize) -> Box { + Box::new(PrimitiveVectorBuilder::<$Type>::with_capacity(capacity)) + } } impl std::fmt::Debug for PrimitiveType<$Type> { @@ -79,7 +126,7 @@ macro_rules! impl_numeric { } } - impl_build_data_type!($Type, $TypeId); + impl_primitive_element!($Type, $TypeId); paste! { pub type [<$TypeId Type>]=PrimitiveType<$Type>; diff --git a/src/datatypes/src/types/string_type.rs b/src/datatypes/src/types/string_type.rs index f200957ad6..c47da8f1bd 100644 --- a/src/datatypes/src/types/string_type.rs +++ b/src/datatypes/src/types/string_type.rs @@ -6,6 +6,8 @@ use serde::{Deserialize, Serialize}; use crate::data_type::DataType; use crate::prelude::{DataTypeRef, LogicalTypeId, Value}; +use crate::scalars::ScalarVectorBuilder; +use crate::vectors::{MutableVector, StringVectorBuilder}; #[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct StringType; @@ -32,4 +34,8 @@ impl DataType for StringType { fn as_arrow_type(&self) -> ArrowDataType { ArrowDataType::Utf8 } + + fn create_mutable_vector(&self, capacity: usize) -> Box { + Box::new(StringVectorBuilder::with_capacity(capacity)) + } } diff --git a/src/datatypes/src/value.rs b/src/datatypes/src/value.rs index 381da8512c..aa98ba72e8 100644 --- a/src/datatypes/src/value.rs +++ b/src/datatypes/src/value.rs @@ -1,20 +1,22 @@ use std::cmp::Ordering; use common_base::bytes::{Bytes, StringBytes}; +use common_time::date::Date; +use common_time::datetime::DateTime; pub use ordered_float::OrderedFloat; use serde::{Deserialize, Serialize}; +use crate::error::{self, Result}; use crate::prelude::*; +use crate::vectors::ListVector; pub type OrderedF32 = OrderedFloat; pub type OrderedF64 = OrderedFloat; /// Value holds a single arbitrary value of any [DataType](crate::data_type::DataType). /// -/// Although compare Value with different data type is allowed, it is recommended to only -/// compare Value with same data type. Comparing Value with different data type may not -/// behaves as what you expect. -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] +/// Comparison between values with different types (expect Null) is not allowed. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum Value { Null, @@ -36,8 +38,8 @@ pub enum Value { Binary(Bytes), // Date & Time types: - Date(common_time::date::Date), - DateTime(common_time::datetime::DateTime), + Date(Date), + DateTime(DateTime), List(ListValue), } @@ -69,13 +71,95 @@ impl Value { } } + /// Returns true if this is a null value. pub fn is_null(&self) -> bool { matches!(self, Value::Null) } + + /// Cast itself to [ListValue]. + pub fn as_list(&self) -> Result> { + match self { + Value::Null => Ok(None), + Value::List(v) => Ok(Some(v)), + other => error::CastTypeSnafu { + msg: format!("Failed to cast {:?} to list value", other), + } + .fail(), + } + } + + /// Cast itself to [ValueRef]. + pub fn as_value_ref(&self) -> ValueRef { + match self { + Value::Null => ValueRef::Null, + Value::Boolean(v) => ValueRef::Boolean(*v), + Value::UInt8(v) => ValueRef::UInt8(*v), + Value::UInt16(v) => ValueRef::UInt16(*v), + Value::UInt32(v) => ValueRef::UInt32(*v), + Value::UInt64(v) => ValueRef::UInt64(*v), + Value::Int8(v) => ValueRef::Int8(*v), + Value::Int16(v) => ValueRef::Int16(*v), + Value::Int32(v) => ValueRef::Int32(*v), + Value::Int64(v) => ValueRef::Int64(*v), + Value::Float32(v) => ValueRef::Float32(*v), + Value::Float64(v) => ValueRef::Float64(*v), + Value::String(v) => ValueRef::String(v.as_utf8()), + Value::Binary(v) => ValueRef::Binary(v), + Value::Date(v) => ValueRef::Date(*v), + Value::DateTime(v) => ValueRef::DateTime(*v), + Value::List(v) => ValueRef::List(ListValueRef::Ref(v)), + } + } } -macro_rules! impl_from { - ($Variant:ident, $Type:ident) => { +macro_rules! impl_ord_for_value_like { + ($Type: ident, $left: ident, $right: ident) => { + if $left.is_null() && !$right.is_null() { + return Ordering::Less; + } else if !$left.is_null() && $right.is_null() { + return Ordering::Greater; + } else { + match ($left, $right) { + ($Type::Null, $Type::Null) => Ordering::Equal, + ($Type::Boolean(v1), $Type::Boolean(v2)) => v1.cmp(v2), + ($Type::UInt8(v1), $Type::UInt8(v2)) => v1.cmp(v2), + ($Type::UInt16(v1), $Type::UInt16(v2)) => v1.cmp(v2), + ($Type::UInt32(v1), $Type::UInt32(v2)) => v1.cmp(v2), + ($Type::UInt64(v1), $Type::UInt64(v2)) => v1.cmp(v2), + ($Type::Int8(v1), $Type::Int8(v2)) => v1.cmp(v2), + ($Type::Int16(v1), $Type::Int16(v2)) => v1.cmp(v2), + ($Type::Int32(v1), $Type::Int32(v2)) => v1.cmp(v2), + ($Type::Int64(v1), $Type::Int64(v2)) => v1.cmp(v2), + ($Type::Float32(v1), $Type::Float32(v2)) => v1.cmp(v2), + ($Type::Float64(v1), $Type::Float64(v2)) => v1.cmp(v2), + ($Type::String(v1), $Type::String(v2)) => v1.cmp(v2), + ($Type::Binary(v1), $Type::Binary(v2)) => v1.cmp(v2), + ($Type::Date(v1), $Type::Date(v2)) => v1.cmp(v2), + ($Type::DateTime(v1), $Type::DateTime(v2)) => v1.cmp(v2), + ($Type::List(v1), $Type::List(v2)) => v1.cmp(v2), + _ => panic!( + "Cannot compare different values {:?} and {:?}", + $left, $right + ), + } + } + }; +} + +impl PartialOrd for Value { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for Value { + fn cmp(&self, other: &Self) -> Ordering { + impl_ord_for_value_like!(Value, self, other) + } +} + +macro_rules! impl_value_from { + ($Variant: ident, $Type: ident) => { impl From<$Type> for Value { fn from(value: $Type) -> Self { Value::$Variant(value.into()) @@ -93,19 +177,19 @@ macro_rules! impl_from { }; } -impl_from!(Boolean, bool); -impl_from!(UInt8, u8); -impl_from!(UInt16, u16); -impl_from!(UInt32, u32); -impl_from!(UInt64, u64); -impl_from!(Int8, i8); -impl_from!(Int16, i16); -impl_from!(Int32, i32); -impl_from!(Int64, i64); -impl_from!(Float32, f32); -impl_from!(Float64, f64); -impl_from!(String, StringBytes); -impl_from!(Binary, Bytes); +impl_value_from!(Boolean, bool); +impl_value_from!(UInt8, u8); +impl_value_from!(UInt16, u16); +impl_value_from!(UInt32, u32); +impl_value_from!(UInt64, u64); +impl_value_from!(Int8, i8); +impl_value_from!(Int16, i16); +impl_value_from!(Int32, i32); +impl_value_from!(Int64, i64); +impl_value_from!(Float32, f32); +impl_value_from!(Float64, f64); +impl_value_from!(String, StringBytes); +impl_value_from!(Binary, Bytes); impl From for Value { fn from(string: String) -> Value { @@ -159,6 +243,7 @@ impl TryFrom for serde_json::Value { } } +/// List value. #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct ListValue { /// List of nested Values (boxed to reduce size_of(Value)) @@ -201,10 +286,208 @@ impl Ord for ListValue { } } +/// Reference to [Value]. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ValueRef<'a> { + Null, + + // Numeric types: + Boolean(bool), + UInt8(u8), + UInt16(u16), + UInt32(u32), + UInt64(u64), + Int8(i8), + Int16(i16), + Int32(i32), + Int64(i64), + Float32(OrderedF32), + Float64(OrderedF64), + + // String types: + String(&'a str), + Binary(&'a [u8]), + + // Date & Time types: + Date(Date), + DateTime(DateTime), + + List(ListValueRef<'a>), +} + +macro_rules! impl_as_for_value_ref { + ($value: ident, $Variant: ident) => { + match $value { + ValueRef::Null => Ok(None), + ValueRef::$Variant(v) => Ok(Some(*v)), + other => error::CastTypeSnafu { + msg: format!( + "Failed to cast value ref {:?} to {}", + other, + stringify!($Variant) + ), + } + .fail(), + } + }; +} + +impl<'a> ValueRef<'a> { + /// Returns true if this is null. + pub fn is_null(&self) -> bool { + matches!(self, ValueRef::Null) + } + + /// Cast itself to binary slice. + pub fn as_binary(&self) -> Result> { + impl_as_for_value_ref!(self, Binary) + } + + /// Cast itself to string slice. + pub fn as_string(&self) -> Result> { + impl_as_for_value_ref!(self, String) + } + + /// Cast itself to boolean. + pub fn as_boolean(&self) -> Result> { + impl_as_for_value_ref!(self, Boolean) + } + + /// Cast itself to [Date]. + pub fn as_date(&self) -> Result> { + impl_as_for_value_ref!(self, Date) + } + + /// Cast itself to [DateTime]. + pub fn as_datetime(&self) -> Result> { + impl_as_for_value_ref!(self, DateTime) + } + + /// Cast itself to [ListValueRef]. + pub fn as_list(&self) -> Result> { + impl_as_for_value_ref!(self, List) + } +} + +impl<'a> PartialOrd for ValueRef<'a> { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl<'a> Ord for ValueRef<'a> { + fn cmp(&self, other: &Self) -> Ordering { + impl_ord_for_value_like!(ValueRef, self, other) + } +} + +/// A helper trait to convert copyable types to `ValueRef`. +/// +/// It could replace the usage of `Into>`, thus avoid confusion between `Into` +/// and `Into>` in generic codes. One typical usage is the [`Primitive`](crate::primitive_traits::Primitive) trait. +pub trait IntoValueRef<'a> { + /// Convert itself to [ValueRef]. + fn into_value_ref(self) -> ValueRef<'a>; +} + +macro_rules! impl_value_ref_from { + ($Variant:ident, $Type:ident) => { + impl From<$Type> for ValueRef<'_> { + fn from(value: $Type) -> Self { + ValueRef::$Variant(value.into()) + } + } + + impl<'a> IntoValueRef<'a> for $Type { + fn into_value_ref(self) -> ValueRef<'a> { + ValueRef::$Variant(self.into()) + } + } + + impl From> for ValueRef<'_> { + fn from(value: Option<$Type>) -> Self { + match value { + Some(v) => ValueRef::$Variant(v.into()), + None => ValueRef::Null, + } + } + } + + impl<'a> IntoValueRef<'a> for Option<$Type> { + fn into_value_ref(self) -> ValueRef<'a> { + match self { + Some(v) => ValueRef::$Variant(v.into()), + None => ValueRef::Null, + } + } + } + }; +} + +impl_value_ref_from!(Boolean, bool); +impl_value_ref_from!(UInt8, u8); +impl_value_ref_from!(UInt16, u16); +impl_value_ref_from!(UInt32, u32); +impl_value_ref_from!(UInt64, u64); +impl_value_ref_from!(Int8, i8); +impl_value_ref_from!(Int16, i16); +impl_value_ref_from!(Int32, i32); +impl_value_ref_from!(Int64, i64); +impl_value_ref_from!(Float32, f32); +impl_value_ref_from!(Float64, f64); + +impl<'a> From<&'a str> for ValueRef<'a> { + fn from(string: &'a str) -> ValueRef<'a> { + ValueRef::String(string) + } +} + +impl<'a> From<&'a [u8]> for ValueRef<'a> { + fn from(bytes: &'a [u8]) -> ValueRef<'a> { + ValueRef::Binary(bytes) + } +} + +/// Reference to a [ListValue]. +// Comparison still requires some allocation (call of `to_value()`) and might be avoidable. +#[derive(Debug, Clone, Copy)] +pub enum ListValueRef<'a> { + Indexed { vector: &'a ListVector, idx: usize }, + Ref(&'a ListValue), +} + +impl<'a> ListValueRef<'a> { + fn to_value(self) -> Value { + match self { + ListValueRef::Indexed { vector, idx } => vector.get(idx), + ListValueRef::Ref(v) => Value::List((*v).clone()), + } + } +} + +impl<'a> PartialEq for ListValueRef<'a> { + fn eq(&self, other: &Self) -> bool { + self.to_value().eq(&other.to_value()) + } +} + +impl<'a> Eq for ListValueRef<'a> {} + +impl<'a> Ord for ListValueRef<'a> { + fn cmp(&self, other: &Self) -> Ordering { + // Respect the order of `Value` by converting into value before comparison. + self.to_value().cmp(&other.to_value()) + } +} + +impl<'a> PartialOrd for ListValueRef<'a> { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + #[cfg(test)] mod tests { - use common_time::datetime::DateTime; - use super::*; #[test] @@ -423,4 +706,165 @@ mod tests { })) ); } + + #[test] + fn test_null_value() { + assert!(Value::Null.is_null()); + assert!(!Value::Boolean(true).is_null()); + assert!(Value::Null < Value::Boolean(false)); + assert!(Value::Boolean(true) > Value::Null); + assert!(Value::Null < Value::Int32(10)); + assert!(Value::Int32(10) > Value::Null); + } + + #[test] + fn test_null_value_ref() { + assert!(ValueRef::Null.is_null()); + assert!(!ValueRef::Boolean(true).is_null()); + assert!(ValueRef::Null < ValueRef::Boolean(false)); + assert!(ValueRef::Boolean(true) > ValueRef::Null); + assert!(ValueRef::Null < ValueRef::Int32(10)); + assert!(ValueRef::Int32(10) > ValueRef::Null); + } + + #[test] + fn test_as_value_ref() { + macro_rules! check_as_value_ref { + ($Variant: ident, $data: expr) => { + let value = Value::$Variant($data); + let value_ref = value.as_value_ref(); + let expect_ref = ValueRef::$Variant($data); + + assert_eq!(expect_ref, value_ref); + }; + } + + assert_eq!(ValueRef::Null, Value::Null.as_value_ref()); + check_as_value_ref!(Boolean, true); + check_as_value_ref!(UInt8, 123); + check_as_value_ref!(UInt16, 123); + check_as_value_ref!(UInt32, 123); + check_as_value_ref!(UInt64, 123); + check_as_value_ref!(Int8, -12); + check_as_value_ref!(Int16, -12); + check_as_value_ref!(Int32, -12); + check_as_value_ref!(Int64, -12); + check_as_value_ref!(Float32, OrderedF32::from(16.0)); + check_as_value_ref!(Float64, OrderedF64::from(16.0)); + + assert_eq!( + ValueRef::String("hello"), + Value::String("hello".into()).as_value_ref() + ); + assert_eq!( + ValueRef::Binary(b"hello"), + Value::Binary("hello".as_bytes().into()).as_value_ref() + ); + + check_as_value_ref!(Date, Date::new(103)); + check_as_value_ref!(DateTime, DateTime::new(1034)); + + let list = ListValue { + items: None, + datatype: ConcreteDataType::int32_datatype(), + }; + assert_eq!( + ValueRef::List(ListValueRef::Ref(&list)), + Value::List(list.clone()).as_value_ref() + ); + } + + #[test] + fn test_value_ref_as() { + macro_rules! check_as_null { + ($method: ident) => { + assert_eq!(None, ValueRef::Null.$method().unwrap()); + }; + } + + check_as_null!(as_binary); + check_as_null!(as_string); + check_as_null!(as_boolean); + check_as_null!(as_date); + check_as_null!(as_datetime); + check_as_null!(as_list); + + macro_rules! check_as_correct { + ($data: expr, $Variant: ident, $method: ident) => { + assert_eq!(Some($data), ValueRef::$Variant($data).$method().unwrap()); + }; + } + + check_as_correct!("hello", String, as_string); + check_as_correct!("hello".as_bytes(), Binary, as_binary); + check_as_correct!(true, Boolean, as_boolean); + check_as_correct!(Date::new(123), Date, as_date); + check_as_correct!(DateTime::new(12), DateTime, as_datetime); + let list = ListValue { + items: None, + datatype: ConcreteDataType::int32_datatype(), + }; + check_as_correct!(ListValueRef::Ref(&list), List, as_list); + + let wrong_value = ValueRef::Int32(12345); + assert!(wrong_value.as_binary().is_err()); + assert!(wrong_value.as_string().is_err()); + assert!(wrong_value.as_boolean().is_err()); + assert!(wrong_value.as_date().is_err()); + assert!(wrong_value.as_datetime().is_err()); + assert!(wrong_value.as_list().is_err()); + } + + #[test] + fn test_into_value_ref() { + macro_rules! check_into_value_ref { + ($Variant: ident, $data: expr, $PrimitiveType: ident, $Wrapper: ident) => { + let data: $PrimitiveType = $data; + assert_eq!( + ValueRef::$Variant($Wrapper::from(data)), + data.into_value_ref() + ); + assert_eq!( + ValueRef::$Variant($Wrapper::from(data)), + ValueRef::from(data) + ); + assert_eq!( + ValueRef::$Variant($Wrapper::from(data)), + Some(data).into_value_ref() + ); + assert_eq!( + ValueRef::$Variant($Wrapper::from(data)), + ValueRef::from(Some(data)) + ); + let x: Option<$PrimitiveType> = None; + assert_eq!(ValueRef::Null, x.into_value_ref()); + assert_eq!(ValueRef::Null, x.into()); + }; + } + + macro_rules! check_primitive_into_value_ref { + ($Variant: ident, $data: expr, $PrimitiveType: ident) => { + check_into_value_ref!($Variant, $data, $PrimitiveType, $PrimitiveType) + }; + } + + check_primitive_into_value_ref!(Boolean, true, bool); + check_primitive_into_value_ref!(UInt8, 10, u8); + check_primitive_into_value_ref!(UInt16, 20, u16); + check_primitive_into_value_ref!(UInt32, 30, u32); + check_primitive_into_value_ref!(UInt64, 40, u64); + check_primitive_into_value_ref!(Int8, -10, i8); + check_primitive_into_value_ref!(Int16, -20, i16); + check_primitive_into_value_ref!(Int32, -30, i32); + check_primitive_into_value_ref!(Int64, -40, i64); + check_into_value_ref!(Float32, 10.0, f32, OrderedF32); + check_into_value_ref!(Float64, 10.0, f64, OrderedF64); + + let hello = "hello"; + assert_eq!( + ValueRef::Binary(hello.as_bytes()), + ValueRef::from(hello.as_bytes()) + ); + assert_eq!(ValueRef::String(hello), ValueRef::from(hello)); + } } diff --git a/src/datatypes/src/vectors.rs b/src/datatypes/src/vectors.rs index ee8bf88946..b95fd39aae 100644 --- a/src/datatypes/src/vectors.rs +++ b/src/datatypes/src/vectors.rs @@ -16,7 +16,7 @@ use std::any::Any; use std::fmt::Debug; use std::sync::Arc; -use arrow::array::ArrayRef; +use arrow::array::{Array, ArrayRef}; use arrow::bitmap::Bitmap; pub use binary::*; pub use boolean::*; @@ -33,9 +33,9 @@ use snafu::ensure; pub use string::*; use crate::data_type::ConcreteDataType; -use crate::error::{BadArrayAccessSnafu, Result}; +use crate::error::{self, Result}; use crate::serialize::Serializable; -use crate::value::Value; +use crate::value::{Value, ValueRef}; #[derive(Debug, PartialEq)] pub enum Validity<'a> { @@ -80,6 +80,9 @@ pub trait Vector: Send + Sync + Serializable + Debug { /// Convert this vector to a new arrow [ArrayRef]. fn to_arrow_array(&self) -> ArrayRef; + /// Convert this vector to a new boxed arrow [Array]. + fn to_boxed_arrow_array(&self) -> Box; + /// Returns the validity of the Array. fn validity(&self) -> Validity; @@ -110,6 +113,10 @@ pub trait Vector: Send + Sync + Serializable + Debug { self.null_count() == self.len() } + /// Slices the `Vector`, returning a new `VectorRef`. + /// + /// # Panics + /// This function panics if `offset + length > self.len()`. fn slice(&self, offset: usize, length: usize) -> VectorRef; /// Returns the clone of value at `index`. @@ -123,7 +130,7 @@ pub trait Vector: Send + Sync + Serializable + Debug { fn try_get(&self, index: usize) -> Result { ensure!( index < self.len(), - BadArrayAccessSnafu { + error::BadArrayAccessSnafu { index, size: self.len() } @@ -134,6 +141,12 @@ pub trait Vector: Send + Sync + Serializable + Debug { // Copies each element according offsets parameter. // (i-th element should be copied offsets[i] - offsets[i - 1] times.) fn replicate(&self, offsets: &[usize]) -> VectorRef; + + /// Returns the reference of value at `index`. + /// + /// # Panics + /// Panic if `index` is out of bound. + fn get_ref(&self, index: usize) -> ValueRef; } pub type VectorRef = Arc; @@ -180,8 +193,40 @@ macro_rules! impl_get_for_vector { }; } +macro_rules! impl_get_ref_for_vector { + ($array: expr, $index: ident) => { + if $array.is_valid($index) { + // Safety: The index have been checked by `is_valid()`. + unsafe { $array.value_unchecked($index).into() } + } else { + ValueRef::Null + } + }; +} + +macro_rules! impl_extend_for_builder { + ($mutable_array: expr, $vector: ident, $VectorType: ident, $offset: ident, $length: ident) => {{ + use snafu::OptionExt; + + let concrete_vector = $vector + .as_any() + .downcast_ref::<$VectorType>() + .with_context(|| crate::error::CastTypeSnafu { + msg: format!( + "Failed to cast vector from {} to {}", + $vector.vector_type_name(), + stringify!($VectorType) + ), + })?; + let slice = concrete_vector.array.slice($offset, $length); + $mutable_array.extend_trusted_len(slice.iter()); + Ok(()) + }}; +} + pub(crate) use { - impl_get_for_vector, impl_try_from_arrow_array_for_vector, impl_validity_for_vector, + impl_extend_for_builder, impl_get_for_vector, impl_get_ref_for_vector, + impl_try_from_arrow_array_for_vector, impl_validity_for_vector, }; #[cfg(test)] @@ -192,7 +237,7 @@ pub mod tests { use super::helper::Helper; use super::*; use crate::data_type::DataType; - use crate::types::DataTypeBuilder; + use crate::types::PrimitiveElement; #[test] fn test_df_columns_to_vector() { diff --git a/src/datatypes/src/vectors/binary.rs b/src/datatypes/src/vectors/binary.rs index 5985823b2d..cd4a09e405 100644 --- a/src/datatypes/src/vectors/binary.rs +++ b/src/datatypes/src/vectors/binary.rs @@ -4,16 +4,14 @@ use std::sync::Arc; use arrow::array::{Array, ArrayRef}; use arrow::array::{BinaryValueIter, MutableArray}; use arrow::bitmap::utils::ZipValidity; -use snafu::OptionExt; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use crate::arrow_array::{BinaryArray, MutableBinaryArray}; use crate::data_type::ConcreteDataType; -use crate::error::Result; -use crate::error::SerializeSnafu; +use crate::error::{self, Result}; use crate::scalars::{common, ScalarVector, ScalarVectorBuilder}; use crate::serialize::Serializable; -use crate::value::Value; +use crate::value::{Value, ValueRef}; use crate::vectors::{self, MutableVector, Validity, Vector, VectorRef}; /// Vector of binary strings. @@ -57,6 +55,10 @@ impl Vector for BinaryVector { Arc::new(self.array.clone()) } + fn to_boxed_arrow_array(&self) -> Box { + Box::new(self.array.clone()) + } + fn validity(&self) -> Validity { vectors::impl_validity_for_vector!(self.array) } @@ -80,6 +82,10 @@ impl Vector for BinaryVector { fn replicate(&self, offsets: &[usize]) -> VectorRef { common::replicate_scalar_vector(self, offsets) } + + fn get_ref(&self, index: usize) -> ValueRef { + vectors::impl_get_ref_for_vector!(self.array, index) + } } impl ScalarVector for BinaryVector { @@ -125,6 +131,15 @@ impl MutableVector for BinaryVectorBuilder { fn to_vector(&mut self) -> VectorRef { Arc::new(self.finish()) } + + fn push_value_ref(&mut self, value: ValueRef) -> Result<()> { + self.mutable_array.push(value.as_binary()?); + Ok(()) + } + + fn extend_slice_of(&mut self, vector: &dyn Vector, offset: usize, length: usize) -> Result<()> { + vectors::impl_extend_for_builder!(self.mutable_array, vector, BinaryVector, offset, length) + } } impl ScalarVectorBuilder for BinaryVectorBuilder { @@ -155,7 +170,7 @@ impl Serializable for BinaryVector { Some(vec) => serde_json::to_value(vec), }) .collect::>() - .context(SerializeSnafu) + .context(error::SerializeSnafu) } } @@ -169,11 +184,13 @@ mod tests { use super::*; use crate::arrow_array::BinaryArray; + use crate::data_type::DataType; use crate::serialize::Serializable; + use crate::types::BinaryType; #[test] fn test_binary_vector_misc() { - let v = BinaryVector::from(BinaryArray::from_slice(&vec![vec![1, 2, 3], vec![1, 2, 3]])); + let v = BinaryVector::from(BinaryArray::from_slice(&[vec![1, 2, 3], vec![1, 2, 3]])); assert_eq!(2, v.len()); assert_eq!("BinaryVector", v.vector_type_name()); @@ -185,6 +202,7 @@ mod tests { for i in 0..2 { assert!(!v.is_null(i)); assert_eq!(Value::Binary(Bytes::from(vec![1, 2, 3])), v.get(i)); + assert_eq!(ValueRef::Binary(&[1, 2, 3]), v.get_ref(i)); } let arrow_arr = v.to_arrow_array(); @@ -194,8 +212,7 @@ mod tests { #[test] fn test_serialize_binary_vector_to_json() { - let vector = - BinaryVector::from(BinaryArray::from_slice(&vec![vec![1, 2, 3], vec![1, 2, 3]])); + let vector = BinaryVector::from(BinaryArray::from_slice(&[vec![1, 2, 3], vec![1, 2, 3]])); let json_value = vector.serialize_to_json().unwrap(); assert_eq!( @@ -221,7 +238,7 @@ mod tests { #[test] fn test_from_arrow_array() { - let arrow_array = BinaryArray::from_slice(&vec![vec![1, 2, 3], vec![1, 2, 3]]); + let arrow_array = BinaryArray::from_slice(&[vec![1, 2, 3], vec![1, 2, 3]]); let original = arrow_array.clone(); let vector = BinaryVector::from(arrow_array); assert_eq!(original, vector.array); @@ -270,4 +287,23 @@ mod tests { assert_eq!(1, slots.null_count()); assert!(!slots.get_bit(1)); } + + #[test] + fn test_binary_vector_builder() { + let input = BinaryVector::from_slice(&[b"world", b"one", b"two"]); + + let mut builder = BinaryType::default().create_mutable_vector(3); + builder + .push_value_ref(ValueRef::Binary("hello".as_bytes())) + .unwrap(); + assert!(builder.push_value_ref(ValueRef::Int32(123)).is_err()); + builder.extend_slice_of(&input, 1, 2).unwrap(); + assert!(builder + .extend_slice_of(&crate::vectors::Int32Vector::from_slice(&[13]), 0, 1) + .is_err()); + let vector = builder.to_vector(); + + let expect: VectorRef = Arc::new(BinaryVector::from_slice(&[b"hello", b"one", b"two"])); + assert_eq!(expect, vector); + } } diff --git a/src/datatypes/src/vectors/boolean.rs b/src/datatypes/src/vectors/boolean.rs index 85d926a324..3cc682d6dc 100644 --- a/src/datatypes/src/vectors/boolean.rs +++ b/src/datatypes/src/vectors/boolean.rs @@ -4,15 +4,14 @@ use std::sync::Arc; use arrow::array::{Array, ArrayRef, BooleanArray, MutableArray, MutableBooleanArray}; use arrow::bitmap::utils::{BitmapIter, ZipValidity}; -use snafu::OptionExt; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use crate::data_type::ConcreteDataType; use crate::error::Result; use crate::scalars::common::replicate_scalar_vector; use crate::scalars::{ScalarVector, ScalarVectorBuilder}; use crate::serialize::Serializable; -use crate::value::Value; +use crate::value::{Value, ValueRef}; use crate::vectors::{self, MutableVector, Validity, Vector, VectorRef}; /// Vector of boolean. @@ -72,6 +71,10 @@ impl Vector for BooleanVector { Arc::new(self.array.clone()) } + fn to_boxed_arrow_array(&self) -> Box { + Box::new(self.array.clone()) + } + fn validity(&self) -> Validity { vectors::impl_validity_for_vector!(self.array) } @@ -95,6 +98,10 @@ impl Vector for BooleanVector { fn replicate(&self, offsets: &[usize]) -> VectorRef { replicate_scalar_vector(self, offsets) } + + fn get_ref(&self, index: usize) -> ValueRef { + vectors::impl_get_ref_for_vector!(self.array, index) + } } impl ScalarVector for BooleanVector { @@ -140,6 +147,15 @@ impl MutableVector for BooleanVectorBuilder { fn to_vector(&mut self) -> VectorRef { Arc::new(self.finish()) } + + fn push_value_ref(&mut self, value: ValueRef) -> Result<()> { + self.mutable_array.push(value.as_boolean()?); + Ok(()) + } + + fn extend_slice_of(&mut self, vector: &dyn Vector, offset: usize, length: usize) -> Result<()> { + vectors::impl_extend_for_builder!(self.mutable_array, vector, BooleanVector, offset, length) + } } impl ScalarVectorBuilder for BooleanVectorBuilder { @@ -179,7 +195,9 @@ mod tests { use serde_json; use super::*; + use crate::data_type::DataType; use crate::serialize::Serializable; + use crate::types::BooleanType; #[test] fn test_boolean_vector_misc() { @@ -195,6 +213,7 @@ mod tests { for (i, b) in bools.iter().enumerate() { assert!(!v.is_null(i)); assert_eq!(Value::Boolean(*b), v.get(i)); + assert_eq!(ValueRef::Boolean(*b), v.get_ref(i)); } let arrow_arr = v.to_arrow_array(); @@ -286,4 +305,21 @@ mod tests { assert_eq!(0, vector.null_count()); assert_eq!(Validity::AllValid, vector.validity()); } + + #[test] + fn test_boolean_vector_builder() { + let input = BooleanVector::from_slice(&[true, false, true]); + + let mut builder = BooleanType::default().create_mutable_vector(3); + builder.push_value_ref(ValueRef::Boolean(true)).unwrap(); + assert!(builder.push_value_ref(ValueRef::Int32(123)).is_err()); + builder.extend_slice_of(&input, 1, 2).unwrap(); + assert!(builder + .extend_slice_of(&crate::vectors::Int32Vector::from_slice(&[13]), 0, 1) + .is_err()); + let vector = builder.to_vector(); + + let expect: VectorRef = Arc::new(BooleanVector::from_slice(&[true, false, true])); + assert_eq!(expect, vector); + } } diff --git a/src/datatypes/src/vectors/constant.rs b/src/datatypes/src/vectors/constant.rs index 09dfd197a8..fa8cdb02af 100644 --- a/src/datatypes/src/vectors/constant.rs +++ b/src/datatypes/src/vectors/constant.rs @@ -2,13 +2,13 @@ use std::any::Any; use std::fmt; use std::sync::Arc; -use arrow::array::ArrayRef; +use arrow::array::{Array, ArrayRef}; use snafu::ResultExt; use crate::data_type::ConcreteDataType; use crate::error::{Result, SerializeSnafu}; use crate::serialize::Serializable; -use crate::value::Value; +use crate::value::{Value, ValueRef}; use crate::vectors::Helper; use crate::vectors::{Validity, Vector, VectorRef}; @@ -55,6 +55,11 @@ impl Vector for ConstantVector { v.to_arrow_array() } + fn to_boxed_arrow_array(&self) -> Box { + let v = self.vector.replicate(&[self.length]); + v.to_boxed_arrow_array() + } + fn is_const(&self) -> bool { true } @@ -98,6 +103,10 @@ impl Vector for ConstantVector { Arc::new(Self::new(self.vector.clone(), *offsets.last().unwrap())) } + + fn get_ref(&self, _index: usize) -> ValueRef { + self.vector.get_ref(0) + } } impl fmt::Debug for ConstantVector { diff --git a/src/datatypes/src/vectors/date.rs b/src/datatypes/src/vectors/date.rs index b8e125e37c..2c0e1e83ee 100644 --- a/src/datatypes/src/vectors/date.rs +++ b/src/datatypes/src/vectors/date.rs @@ -6,8 +6,8 @@ use common_time::date::Date; use snafu::OptionExt; use crate::data_type::ConcreteDataType; -use crate::error::ConversionSnafu; -use crate::prelude::{ScalarVectorBuilder, Validity, Value, Vector, VectorRef}; +use crate::error::{self, Result}; +use crate::prelude::*; use crate::scalars::ScalarVector; use crate::serialize::Serializable; use crate::vectors::{MutableVector, PrimitiveIter, PrimitiveVector, PrimitiveVectorBuilder}; @@ -24,13 +24,13 @@ impl DateVector { } } - pub fn try_from_arrow_array(array: impl AsRef) -> crate::error::Result { + pub fn try_from_arrow_array(array: impl AsRef) -> Result { Ok(Self::new( array .as_ref() .as_any() .downcast_ref::>() - .with_context(|| ConversionSnafu { + .with_context(|| error::ConversionSnafu { from: format!("{:?}", array.as_ref().data_type()), })? .clone(), @@ -65,6 +65,16 @@ impl Vector for DateVector { )) } + fn to_boxed_arrow_array(&self) -> Box { + let validity = self.array.array.validity().cloned(); + let buffer = self.array.array.values().clone(); + Box::new(PrimitiveArray::new( + arrow::datatypes::DataType::Date32, + buffer, + validity, + )) + } + fn validity(&self) -> Validity { self.array.validity() } @@ -94,6 +104,16 @@ impl Vector for DateVector { fn replicate(&self, offsets: &[usize]) -> VectorRef { self.array.replicate(offsets) } + + fn get_ref(&self, index: usize) -> ValueRef { + match self.array.get(index) { + Value::Int32(v) => ValueRef::Date(Date::new(v)), + Value::Null => ValueRef::Null, + _ => { + unreachable!() + } + } + } } impl From>> for DateVector { @@ -135,7 +155,7 @@ impl ScalarVector for DateVector { } impl Serializable for DateVector { - fn serialize_to_json(&self) -> crate::error::Result> { + fn serialize_to_json(&self) -> Result> { Ok(self .array .iter_data() @@ -172,6 +192,26 @@ impl MutableVector for DateVectorBuilder { fn to_vector(&mut self) -> VectorRef { Arc::new(self.finish()) } + + fn push_value_ref(&mut self, value: ValueRef) -> Result<()> { + self.buffer.push(value.as_date()?.map(|d| d.val())); + Ok(()) + } + + fn extend_slice_of(&mut self, vector: &dyn Vector, offset: usize, length: usize) -> Result<()> { + let concrete_vector = vector + .as_any() + .downcast_ref::() + .with_context(|| error::CastTypeSnafu { + msg: format!( + "Failed to convert vector from {} to DateVector", + vector.vector_type_name() + ), + })?; + self.buffer + .extend_slice_of(&concrete_vector.array, offset, length)?; + Ok(()) + } } impl ScalarVectorBuilder for DateVectorBuilder { @@ -197,17 +237,23 @@ impl ScalarVectorBuilder for DateVectorBuilder { #[cfg(test)] mod tests { use super::*; + use crate::data_type::DataType; + use crate::types::DateType; #[test] - pub fn test_build_date_vector() { + fn test_build_date_vector() { let mut builder = DateVectorBuilder::with_capacity(4); builder.push(Some(Date::new(1))); builder.push(None); builder.push(Some(Date::new(-1))); let vector = builder.finish(); assert_eq!(3, vector.len()); + assert_eq!(Value::Date(Date::new(1)), vector.get(0)); + assert_eq!(ValueRef::Date(Date::new(1)), vector.get_ref(0)); assert_eq!(Some(Date::new(1)), vector.get_data(0)); assert_eq!(None, vector.get_data(1)); + assert_eq!(Value::Null, vector.get(1)); + assert_eq!(ValueRef::Null, vector.get_ref(1)); assert_eq!(Some(Date::new(-1)), vector.get_data(2)); let mut iter = vector.iter_data(); assert_eq!(Some(Date::new(1)), iter.next().unwrap()); @@ -216,10 +262,33 @@ mod tests { } #[test] - pub fn test_date_scalar() { + fn test_date_scalar() { let vector = DateVector::from_slice(&[Date::new(1), Date::new(2)]); assert_eq!(2, vector.len()); assert_eq!(Some(Date::new(1)), vector.get_data(0)); assert_eq!(Some(Date::new(2)), vector.get_data(1)); } + + #[test] + fn test_date_vector_builder() { + let input = DateVector::from_slice(&[Date::new(1), Date::new(2), Date::new(3)]); + + let mut builder = DateType::default().create_mutable_vector(3); + builder + .push_value_ref(ValueRef::Date(Date::new(5))) + .unwrap(); + assert!(builder.push_value_ref(ValueRef::Int32(123)).is_err()); + builder.extend_slice_of(&input, 1, 2).unwrap(); + assert!(builder + .extend_slice_of(&crate::vectors::Int32Vector::from_slice(&[13]), 0, 1) + .is_err()); + let vector = builder.to_vector(); + + let expect: VectorRef = Arc::new(DateVector::from_slice(&[ + Date::new(5), + Date::new(2), + Date::new(3), + ])); + assert_eq!(expect, vector); + } } diff --git a/src/datatypes/src/vectors/datetime.rs b/src/datatypes/src/vectors/datetime.rs index cd52ca8a23..1106b1e9f2 100644 --- a/src/datatypes/src/vectors/datetime.rs +++ b/src/datatypes/src/vectors/datetime.rs @@ -6,9 +6,9 @@ use common_time::datetime::DateTime; use snafu::OptionExt; use crate::data_type::ConcreteDataType; -use crate::error::ConversionSnafu; +use crate::error::{self, Result}; use crate::prelude::{ - MutableVector, ScalarVector, ScalarVectorBuilder, Validity, Value, Vector, VectorRef, + MutableVector, ScalarVector, ScalarVectorBuilder, Validity, Value, ValueRef, Vector, VectorRef, }; use crate::serialize::Serializable; use crate::vectors::{PrimitiveIter, PrimitiveVector, PrimitiveVectorBuilder}; @@ -25,13 +25,13 @@ impl DateTimeVector { } } - pub fn try_from_arrow_array(array: impl AsRef) -> crate::error::Result { + pub fn try_from_arrow_array(array: impl AsRef) -> Result { Ok(Self::new( array .as_ref() .as_any() .downcast_ref::>() - .with_context(|| ConversionSnafu { + .with_context(|| error::ConversionSnafu { from: format!("{:?}", array.as_ref().data_type()), })? .clone(), @@ -66,6 +66,16 @@ impl Vector for DateTimeVector { )) } + fn to_boxed_arrow_array(&self) -> Box { + let validity = self.array.array.validity().cloned(); + let buffer = self.array.array.values().clone(); + Box::new(PrimitiveArray::new( + arrow::datatypes::DataType::Date64, + buffer, + validity, + )) + } + fn validity(&self) -> Validity { self.array.validity() } @@ -95,6 +105,16 @@ impl Vector for DateTimeVector { fn replicate(&self, offsets: &[usize]) -> VectorRef { self.array.replicate(offsets) } + + fn get_ref(&self, index: usize) -> ValueRef { + match self.array.get(index) { + Value::Int64(v) => ValueRef::DateTime(DateTime::new(v)), + Value::Null => ValueRef::Null, + _ => { + unreachable!() + } + } + } } impl Serializable for DateTimeVector { @@ -163,6 +183,26 @@ impl MutableVector for DateTimeVectorBuilder { fn to_vector(&mut self) -> VectorRef { Arc::new(self.finish()) } + + fn push_value_ref(&mut self, value: ValueRef) -> Result<()> { + self.buffer.push(value.as_datetime()?.map(|d| d.val())); + Ok(()) + } + + fn extend_slice_of(&mut self, vector: &dyn Vector, offset: usize, length: usize) -> Result<()> { + let concrete_vector = vector + .as_any() + .downcast_ref::() + .with_context(|| error::CastTypeSnafu { + msg: format!( + "Failed to convert vector from {} to DateVector", + vector.vector_type_name() + ), + })?; + self.buffer + .extend_slice_of(&concrete_vector.array, offset, length)?; + Ok(()) + } } pub struct DateTimeIter<'a> { @@ -199,9 +239,11 @@ mod tests { use std::assert_matches::assert_matches; use super::*; + use crate::data_type::DataType; + use crate::types::DateTimeType; #[test] - pub fn test_datetime_vector() { + fn test_datetime_vector() { let v = DateTimeVector::new(PrimitiveArray::from_vec(vec![1, 2, 3])); assert_eq!(ConcreteDataType::datetime_datatype(), v.data_type()); assert_eq!(3, v.len()); @@ -210,6 +252,11 @@ mod tests { &arrow::datatypes::DataType::Date64, v.to_arrow_array().data_type() ); + + assert_eq!(Some(DateTime::new(1)), v.get_data(0)); + assert_eq!(Value::DateTime(DateTime::new(1)), v.get(0)); + assert_eq!(ValueRef::DateTime(DateTime::new(1)), v.get_ref(0)); + let mut iter = v.iter_data(); assert_eq!(Some(DateTime::new(1)), iter.next().unwrap()); assert_eq!(Some(DateTime::new(2)), iter.next().unwrap()); @@ -230,7 +277,7 @@ mod tests { } #[test] - pub fn test_datetime_vector_builder() { + fn test_datetime_vector_builder() { let mut builder = DateTimeVectorBuilder::with_capacity(3); builder.push(Some(DateTime::new(1))); builder.push(None); @@ -241,5 +288,26 @@ mod tests { assert_eq!(Value::DateTime(DateTime::new(1)), v.get(0)); assert_eq!(Value::Null, v.get(1)); assert_eq!(Value::DateTime(DateTime::new(-1)), v.get(2)); + + let input = + DateTimeVector::from_slice(&[DateTime::new(1), DateTime::new(2), DateTime::new(3)]); + + let mut builder = DateTimeType::default().create_mutable_vector(3); + builder + .push_value_ref(ValueRef::DateTime(DateTime::new(5))) + .unwrap(); + assert!(builder.push_value_ref(ValueRef::Int32(123)).is_err()); + builder.extend_slice_of(&input, 1, 2).unwrap(); + assert!(builder + .extend_slice_of(&crate::vectors::Int32Vector::from_slice(&[13]), 0, 1) + .is_err()); + let vector = builder.to_vector(); + + let expect: VectorRef = Arc::new(DateTimeVector::from_slice(&[ + DateTime::new(5), + DateTime::new(2), + DateTime::new(3), + ])); + assert_eq!(expect, vector); } } diff --git a/src/datatypes/src/vectors/list.rs b/src/datatypes/src/vectors/list.rs index a8ed1878f8..6349866481 100644 --- a/src/datatypes/src/vectors/list.rs +++ b/src/datatypes/src/vectors/list.rs @@ -2,6 +2,7 @@ use std::any::Any; use std::sync::Arc; use arrow::array::{Array, ArrayRef, ListArray}; +use arrow::bitmap::MutableBitmap; use arrow::datatypes::DataType as ArrowDataType; use serde_json::Value as JsonValue; use snafu::prelude::*; @@ -10,7 +11,7 @@ use crate::error::Result; use crate::prelude::*; use crate::serialize::Serializable; use crate::types::ListType; -use crate::value::ListValue; +use crate::value::{ListValue, ListValueRef}; use crate::vectors::{impl_try_from_arrow_array_for_vector, impl_validity_for_vector}; type ArrowListArray = ListArray; @@ -49,6 +50,10 @@ impl Vector for ListVector { Arc::new(self.array.clone()) } + fn to_boxed_arrow_array(&self) -> Box { + Box::new(self.array.clone()) + } + fn validity(&self) -> Validity { impl_validity_for_vector!(self.array) } @@ -94,6 +99,13 @@ impl Vector for ListVector { // Refer to Databend's `ArrayColumn` for more details. unimplemented!() } + + fn get_ref(&self, index: usize) -> ValueRef { + ValueRef::List(ListValueRef::Indexed { + vector: self, + idx: index, + }) + } } impl Serializable for ListVector { @@ -125,6 +137,134 @@ impl From for ListVector { impl_try_from_arrow_array_for_vector!(ArrowListArray, ListVector); +// Some codes are ported from arrow2's MutableListArray. +pub struct ListVectorBuilder { + inner_type: ConcreteDataType, + offsets: Vec, + values: Box, + validity: Option, +} + +impl ListVectorBuilder { + pub fn with_capacity(inner_type: ConcreteDataType, capacity: usize) -> ListVectorBuilder { + let mut offsets = Vec::with_capacity(capacity + 1); + offsets.push(0); + // The actual required capacity might greater than the capacity of the `ListVector` + // if there exists child vector that has more than one element. + let values = inner_type.create_mutable_vector(capacity); + + ListVectorBuilder { + inner_type, + offsets, + values, + validity: None, + } + } + + #[inline] + fn last_offset(&self) -> i32 { + *self.offsets.last().unwrap() + } + + fn push_null(&mut self) { + self.offsets.push(self.last_offset()); + match &mut self.validity { + Some(validity) => validity.push(false), + None => self.init_validity(), + } + } + + fn init_validity(&mut self) { + let len = self.offsets.len() - 1; + + let mut validity = MutableBitmap::with_capacity(self.offsets.capacity()); + validity.extend_constant(len, true); + validity.set(len - 1, false); + self.validity = Some(validity) + } + + fn push_list_value(&mut self, list_value: &ListValue) -> Result<()> { + if let Some(items) = list_value.items() { + for item in &**items { + self.values.push_value_ref(item.as_value_ref())?; + } + } + self.push_valid(); + Ok(()) + } + + /// Needs to be called when a valid value was extended to this builder. + fn push_valid(&mut self) { + let size = self.values.len(); + let size = i32::try_from(size).unwrap(); + assert!(size >= *self.offsets.last().unwrap()); + + self.offsets.push(size); + if let Some(validity) = &mut self.validity { + validity.push(true) + } + } +} + +impl MutableVector for ListVectorBuilder { + fn data_type(&self) -> ConcreteDataType { + ConcreteDataType::list_datatype(self.inner_type.clone()) + } + + fn len(&self) -> usize { + self.offsets.len() - 1 + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn as_mut_any(&mut self) -> &mut dyn Any { + self + } + + fn to_vector(&mut self) -> VectorRef { + let array = ArrowListArray::try_new( + ConcreteDataType::list_datatype(self.inner_type.clone()).as_arrow_type(), + std::mem::take(&mut self.offsets).into(), + self.values.to_vector().to_arrow_array(), + std::mem::take(&mut self.validity).map(|x| x.into()), + ) + .unwrap(); // The `ListVectorBuilder` itself should ensure it always builds a valid array. + + let vector = ListVector { + array, + inner_data_type: self.inner_type.clone(), + }; + Arc::new(vector) + } + + fn push_value_ref(&mut self, value: ValueRef) -> Result<()> { + if let Some(list_ref) = value.as_list()? { + match list_ref { + ListValueRef::Indexed { vector, idx } => match vector.get(idx).as_list()? { + Some(list_value) => self.push_list_value(list_value)?, + None => self.push_null(), + }, + ListValueRef::Ref(list_value) => self.push_list_value(list_value)?, + } + } else { + self.push_null(); + } + + Ok(()) + } + + fn extend_slice_of(&mut self, vector: &dyn Vector, offset: usize, length: usize) -> Result<()> { + for idx in offset..offset + length { + let value = vector.get_ref(idx); + self.push_value_ref(value)?; + } + + Ok(()) + } +} + #[cfg(test)] mod tests { use arrow::array::{MutableListArray, MutablePrimitiveArray, TryExtend}; @@ -194,6 +334,17 @@ mod tests { )), list_vector.get(0) ); + let value_ref = list_vector.get_ref(0); + assert!(matches!( + value_ref, + ValueRef::List(ListValueRef::Indexed { .. }) + )); + let value_ref = list_vector.get_ref(1); + if let ValueRef::List(ListValueRef::Indexed { idx, .. }) = value_ref { + assert_eq!(1, idx); + } else { + unreachable!() + } assert_eq!(Value::Null, list_vector.get(1)); assert_eq!( Value::List(ListValue::new( @@ -240,10 +391,11 @@ mod tests { arrow_array.try_extend(data).unwrap(); let arrow_array: ArrowListArray = arrow_array.into(); - let list_vector = ListVector { - array: arrow_array, - inner_data_type: ConcreteDataType::int32_datatype(), - }; + let list_vector = ListVector::from(arrow_array); + assert_eq!( + ConcreteDataType::List(ListType::new(ConcreteDataType::int64_datatype())), + list_vector.data_type() + ); let mut iter = list_vector.values_iter(); assert_eq!( "Int64[1, 2, 3]", @@ -272,13 +424,54 @@ mod tests { arrow_array.try_extend(data).unwrap(); let arrow_array: ArrowListArray = arrow_array.into(); - let list_vector = ListVector { - array: arrow_array, - inner_data_type: ConcreteDataType::int32_datatype(), - }; + let list_vector = ListVector::from(arrow_array); assert_eq!( "Ok([Array([Number(1), Number(2), Number(3)]), Null, Array([Number(4), Null, Number(6)])])", format!("{:?}", list_vector.serialize_to_json()) ); } + + fn new_list_vector(data: Vec>>>) -> ListVector { + let mut arrow_array = MutableListArray::>::new(); + arrow_array.try_extend(data).unwrap(); + let arrow_array: ArrowListArray = arrow_array.into(); + + ListVector::from(arrow_array) + } + + #[test] + fn test_list_vector_builder() { + let mut builder = + ListType::new(ConcreteDataType::int32_datatype()).create_mutable_vector(3); + builder + .push_value_ref(ValueRef::List(ListValueRef::Ref(&ListValue::new( + Some(Box::new(vec![ + Value::Int32(4), + Value::Null, + Value::Int32(6), + ])), + ConcreteDataType::int32_datatype(), + )))) + .unwrap(); + assert!(builder.push_value_ref(ValueRef::Int32(123)).is_err()); + + let data = vec![ + Some(vec![Some(1), Some(2), Some(3)]), + None, + Some(vec![Some(7), Some(8), None]), + ]; + let input = new_list_vector(data); + builder.extend_slice_of(&input, 1, 2).unwrap(); + assert!(builder + .extend_slice_of(&crate::vectors::Int32Vector::from_slice(&[13]), 0, 1) + .is_err()); + let vector = builder.to_vector(); + + let expect: VectorRef = Arc::new(new_list_vector(vec![ + Some(vec![Some(4), None, Some(6)]), + None, + Some(vec![Some(7), Some(8), None]), + ])); + assert_eq!(expect, vector); + } } diff --git a/src/datatypes/src/vectors/mutable.rs b/src/datatypes/src/vectors/mutable.rs index ff9ed18a38..1dd6ff78a3 100644 --- a/src/datatypes/src/vectors/mutable.rs +++ b/src/datatypes/src/vectors/mutable.rs @@ -1,19 +1,40 @@ use std::any::Any; +use crate::error::Result; use crate::prelude::*; +/// Mutable vector that could be used to build an immutable vector. pub trait MutableVector: Send + Sync { + /// Returns the data type of the vector. fn data_type(&self) -> ConcreteDataType; + /// Returns the length of the vector. fn len(&self) -> usize; + /// Returns whether the vector is empty. fn is_empty(&self) -> bool { self.len() == 0 } + /// Convert to Any, to enable dynamic casting. fn as_any(&self) -> &dyn Any; + /// Convert to mutable Any, to enable dynamic casting. fn as_mut_any(&mut self) -> &mut dyn Any; + /// Convert `self` to an (immutable) [VectorRef] and reset `self`. fn to_vector(&mut self) -> VectorRef; + + /// Push value ref to this mutable vector. + /// + /// Returns error if data type unmatch. + fn push_value_ref(&mut self, value: ValueRef) -> Result<()>; + + /// Extend this mutable vector by slice of `vector`. + /// + /// Returns error if data type unmatch. + /// + /// # Panics + /// Panics if `offset + length > vector.len()`. + fn extend_slice_of(&mut self, vector: &dyn Vector, offset: usize, length: usize) -> Result<()>; } diff --git a/src/datatypes/src/vectors/null.rs b/src/datatypes/src/vectors/null.rs index 0f30b5fb47..2f483b0ea5 100644 --- a/src/datatypes/src/vectors/null.rs +++ b/src/datatypes/src/vectors/null.rs @@ -5,15 +5,14 @@ use std::sync::Arc; use arrow::array::ArrayRef; use arrow::array::{Array, NullArray}; use arrow::datatypes::DataType as ArrowDataType; -use snafu::OptionExt; +use snafu::{ensure, OptionExt}; use crate::data_type::ConcreteDataType; -use crate::error::Result; +use crate::error::{self, Result}; use crate::serialize::Serializable; use crate::types::NullType; -use crate::value::Value; -use crate::vectors::impl_try_from_arrow_array_for_vector; -use crate::vectors::{Validity, Vector, VectorRef}; +use crate::value::{Value, ValueRef}; +use crate::vectors::{self, MutableVector, Validity, Vector, VectorRef}; #[derive(PartialEq)] pub struct NullVector { @@ -55,6 +54,10 @@ impl Vector for NullVector { Arc::new(self.array.clone()) } + fn to_boxed_arrow_array(&self) -> Box { + Box::new(self.array.clone()) + } + fn validity(&self) -> Validity { Validity::AllNull } @@ -90,6 +93,11 @@ impl Vector for NullVector { array: NullArray::new(ArrowDataType::Null, *offsets.last().unwrap() as usize), }) } + + fn get_ref(&self, _index: usize) -> ValueRef { + // Skips bound check for null array. + ValueRef::Null + } } impl fmt::Debug for NullVector { @@ -106,13 +114,77 @@ impl Serializable for NullVector { } } -impl_try_from_arrow_array_for_vector!(NullArray, NullVector); +vectors::impl_try_from_arrow_array_for_vector!(NullArray, NullVector); + +#[derive(Default)] +pub struct NullVectorBuilder { + length: usize, +} + +impl MutableVector for NullVectorBuilder { + fn data_type(&self) -> ConcreteDataType { + ConcreteDataType::null_datatype() + } + + fn len(&self) -> usize { + self.length + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn as_mut_any(&mut self) -> &mut dyn Any { + self + } + + fn to_vector(&mut self) -> VectorRef { + let vector = Arc::new(NullVector::new(self.length)); + self.length = 0; + vector + } + + fn push_value_ref(&mut self, value: ValueRef) -> Result<()> { + ensure!( + value.is_null(), + error::CastTypeSnafu { + msg: format!("Failed to cast value ref {:?} to null", value), + } + ); + + self.length += 1; + Ok(()) + } + + fn extend_slice_of(&mut self, vector: &dyn Vector, offset: usize, length: usize) -> Result<()> { + vector + .as_any() + .downcast_ref::() + .with_context(|| error::CastTypeSnafu { + msg: format!( + "Failed to convert vector from {} to NullVector", + vector.vector_type_name() + ), + })?; + assert!( + offset + length <= vector.len(), + "offset {} + length {} must less than {}", + offset, + length, + vector.len() + ); + + self.length += length; + Ok(()) + } +} #[cfg(test)] mod tests { use serde_json; use super::*; + use crate::data_type::DataType; #[test] fn test_null_vector_misc() { @@ -135,6 +207,7 @@ mod tests { for i in 0..32 { assert!(v.is_null(i)); assert_eq!(Value::Null, v.get(i)); + assert_eq!(ValueRef::Null, v.get_ref(i)); } } @@ -160,4 +233,21 @@ mod tests { assert_eq!(Validity::AllNull, vector.validity()); assert_eq!(5, vector.null_count()); } + + #[test] + fn test_null_vector_builder() { + let mut builder = NullType::default().create_mutable_vector(3); + builder.push_value_ref(ValueRef::Null).unwrap(); + assert!(builder.push_value_ref(ValueRef::Int32(123)).is_err()); + + let input = NullVector::new(3); + builder.extend_slice_of(&input, 1, 2).unwrap(); + assert!(builder + .extend_slice_of(&crate::vectors::Int32Vector::from_slice(&[13]), 0, 1) + .is_err()); + let vector = builder.to_vector(); + + let expect: VectorRef = Arc::new(input); + assert_eq!(expect, vector); + } } diff --git a/src/datatypes/src/vectors/primitive.rs b/src/datatypes/src/vectors/primitive.rs index b0a9085197..642cee4940 100644 --- a/src/datatypes/src/vectors/primitive.rs +++ b/src/datatypes/src/vectors/primitive.rs @@ -14,8 +14,8 @@ use crate::error::{Result, SerializeSnafu}; use crate::scalars::{Scalar, ScalarRef}; use crate::scalars::{ScalarVector, ScalarVectorBuilder}; use crate::serialize::Serializable; -use crate::types::{DataTypeBuilder, Primitive}; -use crate::value::Value; +use crate::types::{Primitive, PrimitiveElement}; +use crate::value::{Value, ValueRef}; use crate::vectors::{self, MutableVector, Validity, Vector, VectorRef}; /// Vector for primitive data types. @@ -61,7 +61,7 @@ impl PrimitiveVector { } } -impl Vector for PrimitiveVector { +impl Vector for PrimitiveVector { fn data_type(&self) -> ConcreteDataType { T::build_data_type() } @@ -82,6 +82,10 @@ impl Vector for PrimitiveVector { Arc::new(self.array.clone()) } + fn to_boxed_arrow_array(&self) -> Box { + Box::new(self.array.clone()) + } + fn validity(&self) -> Validity { vectors::impl_validity_for_vector!(self.array) } @@ -128,6 +132,15 @@ impl Vector for PrimitiveVector { } builder.to_vector() } + + fn get_ref(&self, index: usize) -> ValueRef { + if self.array.is_valid(index) { + // Safety: The index have been checked by `is_valid()`. + unsafe { self.array.value_unchecked(index).into_value_ref() } + } else { + ValueRef::Null + } + } } impl From> for PrimitiveVector { @@ -154,7 +167,7 @@ impl>> FromIterator for Pr impl ScalarVector for PrimitiveVector where - T: Scalar + Primitive + DataTypeBuilder, + T: Scalar + PrimitiveElement, for<'a> T: ScalarRef<'a, ScalarType = T, VectorType = Self>, for<'a> T: Scalar = T>, { @@ -203,11 +216,11 @@ impl<'a, T: Copy> Iterator for PrimitiveIter<'a, T> { } } -pub struct PrimitiveVectorBuilder { +pub struct PrimitiveVectorBuilder { pub(crate) mutable_array: MutablePrimitiveArray, } -impl PrimitiveVectorBuilder { +impl PrimitiveVectorBuilder { fn with_capacity(capacity: usize) -> Self { Self { mutable_array: MutablePrimitiveArray::with_capacity(capacity), @@ -228,7 +241,7 @@ pub type Int64VectorBuilder = PrimitiveVectorBuilder; pub type Float32VectorBuilder = PrimitiveVectorBuilder; pub type Float64VectorBuilder = PrimitiveVectorBuilder; -impl MutableVector for PrimitiveVectorBuilder { +impl MutableVector for PrimitiveVectorBuilder { fn data_type(&self) -> ConcreteDataType { T::build_data_type() } @@ -250,11 +263,25 @@ impl MutableVector for PrimitiveVectorBuilder array: std::mem::take(&mut self.mutable_array).into(), }) } + + fn push_value_ref(&mut self, value: ValueRef) -> Result<()> { + let primitive = T::cast_value_ref(value)?; + self.mutable_array.push(primitive); + Ok(()) + } + + fn extend_slice_of(&mut self, vector: &dyn Vector, offset: usize, length: usize) -> Result<()> { + let primitive = T::cast_vector(vector)?; + // Slice the underlying array to avoid creating a new Arc. + let slice = primitive.slice(offset, length); + self.mutable_array.extend_trusted_len(slice.iter()); + Ok(()) + } } impl ScalarVectorBuilder for PrimitiveVectorBuilder where - T: Scalar> + Primitive + DataTypeBuilder, + T: Scalar> + PrimitiveElement, for<'a> T: ScalarRef<'a, ScalarType = T, VectorType = PrimitiveVector>, for<'a> T: Scalar = T>, { @@ -277,7 +304,7 @@ where } } -impl Serializable for PrimitiveVector { +impl Serializable for PrimitiveVector { fn serialize_to_json(&self) -> Result> { self.array .iter() @@ -293,7 +320,9 @@ mod tests { use serde_json; use super::*; + use crate::data_type::DataType; use crate::serialize::Serializable; + use crate::types::Int64Type; fn check_vec(v: PrimitiveVector) { assert_eq!(4, v.len()); @@ -305,6 +334,7 @@ mod tests { for i in 0..4 { assert!(!v.is_null(i)); assert_eq!(Value::Int32(i as i32 + 1), v.get(i)); + assert_eq!(ValueRef::Int32(i as i32 + 1), v.get_ref(i)); } let json_value = v.serialize_to_json().unwrap(); @@ -416,4 +446,21 @@ mod tests { let v = PrimitiveVector::::from(vec![Some(0i64), Some(1i64), Some(2i64), None, None]); assert_eq!(40, v.memory_size()); } + + #[test] + fn test_primitive_vector_builder() { + let mut builder = Int64Type::default().create_mutable_vector(3); + builder.push_value_ref(ValueRef::Int64(123)).unwrap(); + assert!(builder.push_value_ref(ValueRef::Int32(123)).is_err()); + + let input = Int64Vector::from_slice(&[7, 8, 9]); + builder.extend_slice_of(&input, 1, 2).unwrap(); + assert!(builder + .extend_slice_of(&Int32Vector::from_slice(&[13]), 0, 1) + .is_err()); + let vector = builder.to_vector(); + + let expect: VectorRef = Arc::new(Int64Vector::from_slice(&[123, 8, 9])); + assert_eq!(expect, vector); + } } diff --git a/src/datatypes/src/vectors/string.rs b/src/datatypes/src/vectors/string.rs index d7d99dd583..02664d1e26 100644 --- a/src/datatypes/src/vectors/string.rs +++ b/src/datatypes/src/vectors/string.rs @@ -4,16 +4,15 @@ use std::sync::Arc; use arrow::array::{Array, ArrayRef, MutableArray, Utf8ValuesIter}; use arrow::bitmap::utils::ZipValidity; use serde_json::Value as JsonValue; -use snafu::OptionExt; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use crate::arrow_array::{MutableStringArray, StringArray}; use crate::data_type::ConcreteDataType; -use crate::error::SerializeSnafu; +use crate::error::{Result, SerializeSnafu}; use crate::scalars::{common, ScalarVector, ScalarVectorBuilder}; use crate::serialize::Serializable; use crate::types::StringType; -use crate::value::Value; +use crate::value::{Value, ValueRef}; use crate::vectors::{self, MutableVector, Validity, Vector, VectorRef}; /// String array wrapper @@ -89,6 +88,10 @@ impl Vector for StringVector { Arc::new(self.array.clone()) } + fn to_boxed_arrow_array(&self) -> Box { + Box::new(self.array.clone()) + } + fn validity(&self) -> Validity { vectors::impl_validity_for_vector!(self.array) } @@ -112,6 +115,10 @@ impl Vector for StringVector { fn replicate(&self, offsets: &[usize]) -> VectorRef { common::replicate_scalar_vector(self, offsets) } + + fn get_ref(&self, index: usize) -> ValueRef { + vectors::impl_get_ref_for_vector!(self.array, index) + } } impl ScalarVector for StringVector { @@ -157,6 +164,15 @@ impl MutableVector for StringVectorBuilder { fn to_vector(&mut self) -> VectorRef { Arc::new(self.finish()) } + + fn push_value_ref(&mut self, value: ValueRef) -> Result<()> { + self.buffer.push(value.as_string()?); + Ok(()) + } + + fn extend_slice_of(&mut self, vector: &dyn Vector, offset: usize, length: usize) -> Result<()> { + vectors::impl_extend_for_builder!(self.buffer, vector, StringVector, offset, length) + } } impl ScalarVectorBuilder for StringVectorBuilder { @@ -199,6 +215,7 @@ mod tests { use serde_json; use super::*; + use crate::data_type::DataType; #[test] fn test_string_vector_misc() { @@ -213,6 +230,7 @@ mod tests { for (i, s) in strs.iter().enumerate() { assert_eq!(Value::from(*s), v.get(i)); + assert_eq!(ValueRef::from(*s), v.get_ref(i)); assert_eq!(Value::from(*s), v.try_get(i).unwrap()); } @@ -273,4 +291,21 @@ mod tests { assert_eq!("world", iter.next().unwrap().unwrap()); assert_eq!(None, iter.next()); } + + #[test] + fn test_string_vector_builder() { + let mut builder = StringType::default().create_mutable_vector(3); + builder.push_value_ref(ValueRef::String("hello")).unwrap(); + assert!(builder.push_value_ref(ValueRef::Int32(123)).is_err()); + + let input = StringVector::from_slice(&["world", "one", "two"]); + builder.extend_slice_of(&input, 1, 2).unwrap(); + assert!(builder + .extend_slice_of(&crate::vectors::Int32Vector::from_slice(&[13]), 0, 1) + .is_err()); + let vector = builder.to_vector(); + + let expect: VectorRef = Arc::new(StringVector::from_slice(&["hello", "one", "two"])); + assert_eq!(expect, vector); + } } diff --git a/src/query/tests/argmax_test.rs b/src/query/tests/argmax_test.rs index 6d927cf249..18bd990f6a 100644 --- a/src/query/tests/argmax_test.rs +++ b/src/query/tests/argmax_test.rs @@ -6,7 +6,7 @@ use datafusion::field_util::FieldExt; use datafusion::field_util::SchemaExt; use datatypes::for_all_primitive_types; use datatypes::prelude::*; -use datatypes::types::DataTypeBuilder; +use datatypes::types::PrimitiveElement; use function::{create_query_engine, get_numbers_from_table}; use query::error::Result; use query::query_engine::Output; @@ -35,7 +35,7 @@ async fn test_argmax_success( engine: Arc, ) -> Result<()> where - T: Primitive + PartialOrd + DataTypeBuilder, + T: PrimitiveElement + PartialOrd, for<'a> T: Scalar = T>, { let result = execute_argmax(column_name, table_name, engine.clone()) diff --git a/src/query/tests/argmin_test.rs b/src/query/tests/argmin_test.rs index 8112b8f31c..f5d0368f91 100644 --- a/src/query/tests/argmin_test.rs +++ b/src/query/tests/argmin_test.rs @@ -7,7 +7,7 @@ use datafusion::field_util::FieldExt; use datafusion::field_util::SchemaExt; use datatypes::for_all_primitive_types; use datatypes::prelude::*; -use datatypes::types::DataTypeBuilder; +use datatypes::types::PrimitiveElement; use function::{create_query_engine, get_numbers_from_table}; use query::error::Result; use query::query_engine::Output; @@ -36,7 +36,7 @@ async fn test_argmin_success( engine: Arc, ) -> Result<()> where - T: Primitive + PartialOrd + DataTypeBuilder, + T: PrimitiveElement + PartialOrd, for<'a> T: Scalar = T>, { let result = execute_argmin(column_name, table_name, engine.clone()) diff --git a/src/query/tests/function.rs b/src/query/tests/function.rs index 69bf2a3c2f..13b259e945 100644 --- a/src/query/tests/function.rs +++ b/src/query/tests/function.rs @@ -8,7 +8,7 @@ use common_recordbatch::{util, RecordBatch}; use datatypes::for_all_primitive_types; use datatypes::prelude::*; use datatypes::schema::{ColumnSchema, Schema}; -use datatypes::types::DataTypeBuilder; +use datatypes::types::PrimitiveElement; use datatypes::vectors::PrimitiveVector; use query::query_engine::{Output, QueryEngineFactory}; use query::QueryEngine; @@ -59,7 +59,7 @@ pub async fn get_numbers_from_table<'s, T>( engine: Arc, ) -> Vec where - T: Primitive + DataTypeBuilder, + T: PrimitiveElement, for<'a> T: Scalar = T>, { let sql = format!("SELECT {} FROM {}", column_name, table_name); diff --git a/src/query/tests/mean_test.rs b/src/query/tests/mean_test.rs index 93cab0d7a6..4c40c8caed 100644 --- a/src/query/tests/mean_test.rs +++ b/src/query/tests/mean_test.rs @@ -7,7 +7,7 @@ use datafusion::field_util::FieldExt; use datafusion::field_util::SchemaExt; use datatypes::for_all_primitive_types; use datatypes::prelude::*; -use datatypes::types::DataTypeBuilder; +use datatypes::types::PrimitiveElement; use datatypes::value::OrderedFloat; use format_num::NumberFormat; use function::{create_query_engine, get_numbers_from_table}; @@ -39,7 +39,7 @@ async fn test_mean_success( engine: Arc, ) -> Result<()> where - T: Primitive + AsPrimitive + DataTypeBuilder, + T: PrimitiveElement + AsPrimitive, for<'a> T: Scalar = T>, { let result = execute_mean(column_name, table_name, engine.clone()) diff --git a/src/query/tests/my_sum_udaf_example.rs b/src/query/tests/my_sum_udaf_example.rs index b2124bdace..8b4651d07d 100644 --- a/src/query/tests/my_sum_udaf_example.rs +++ b/src/query/tests/my_sum_udaf_example.rs @@ -18,7 +18,7 @@ use datafusion::arrow_print; use datafusion_common::record_batch::RecordBatch as DfRecordBatch; use datatypes::prelude::*; use datatypes::schema::{ColumnSchema, Schema}; -use datatypes::types::DataTypeBuilder; +use datatypes::types::PrimitiveElement; use datatypes::types::PrimitiveType; use datatypes::vectors::PrimitiveVector; use datatypes::with_match_primitive_type_id; @@ -211,7 +211,7 @@ async fn test_my_sum() -> Result<()> { async fn test_my_sum_with(numbers: Vec, expected: Vec<&str>) -> Result<()> where - T: Primitive + DataTypeBuilder, + T: PrimitiveElement, { let table_name = format!("{}_numbers", std::any::type_name::()); let column_name = format!("{}_number", std::any::type_name::()); diff --git a/src/query/tests/percentile_test.rs b/src/query/tests/percentile_test.rs index 8f06e6d48c..7221be9ed1 100644 --- a/src/query/tests/percentile_test.rs +++ b/src/query/tests/percentile_test.rs @@ -11,7 +11,7 @@ use datafusion::field_util::SchemaExt; use datatypes::for_all_ordered_primitive_types; use datatypes::prelude::*; use datatypes::schema::{ColumnSchema, Schema}; -use datatypes::types::DataTypeBuilder; +use datatypes::types::PrimitiveElement; use datatypes::vectors::PrimitiveVector; use function::{create_query_engine, get_numbers_from_table}; use num_traits::AsPrimitive; @@ -65,7 +65,7 @@ async fn test_percentile_success( engine: Arc, ) -> Result<()> where - T: Primitive + AsPrimitive + DataTypeBuilder, + T: PrimitiveElement + AsPrimitive, for<'a> T: Scalar = T>, { let result = execute_percentile(column_name, table_name, engine.clone()) @@ -120,7 +120,7 @@ async fn test_percentile_failed( engine: Arc, ) -> Result<()> where - T: Primitive + DataTypeBuilder, + T: PrimitiveElement, { let result = execute_percentile(column_name, table_name, engine).await; assert!(result.is_err()); diff --git a/src/query/tests/polyval_test.rs b/src/query/tests/polyval_test.rs index e4c6bb6d44..f7509938be 100644 --- a/src/query/tests/polyval_test.rs +++ b/src/query/tests/polyval_test.rs @@ -7,7 +7,7 @@ use datafusion::field_util::FieldExt; use datafusion::field_util::SchemaExt; use datatypes::for_all_primitive_types; use datatypes::prelude::*; -use datatypes::types::DataTypeBuilder; +use datatypes::types::PrimitiveElement; use function::{create_query_engine, get_numbers_from_table}; use num_traits::AsPrimitive; use query::error::Result; @@ -37,7 +37,7 @@ async fn test_polyval_success( engine: Arc, ) -> Result<()> where - T: Primitive + AsPrimitive + DataTypeBuilder, + T: Primitive + AsPrimitive + PrimitiveElement, PolyT: Primitive + std::ops::Mul + std::iter::Sum, for<'a> T: Scalar = T>, for<'a> PolyT: Scalar = PolyT>, diff --git a/src/query/tests/query_engine_test.rs b/src/query/tests/query_engine_test.rs index 0b0f1d7c3c..6cfe698f09 100644 --- a/src/query/tests/query_engine_test.rs +++ b/src/query/tests/query_engine_test.rs @@ -16,7 +16,7 @@ use datafusion::logical_plan::LogicalPlanBuilder; use datatypes::for_all_ordered_primitive_types; use datatypes::prelude::*; use datatypes::schema::{ColumnSchema, Schema}; -use datatypes::types::DataTypeBuilder; +use datatypes::types::PrimitiveElement; use datatypes::vectors::{Float32Vector, Float64Vector, PrimitiveVector, UInt32Vector}; use num::NumCast; use query::error::Result; @@ -236,7 +236,7 @@ async fn get_numbers_from_table<'s, T>( engine: Arc, ) -> Vec where - T: Primitive + DataTypeBuilder, + T: PrimitiveElement, for<'a> T: Scalar = T>, { let sql = format!("SELECT {} FROM {}", column_name, table_name); @@ -285,7 +285,7 @@ async fn test_median_success( engine: Arc, ) -> Result<()> where - T: Primitive + Ord + DataTypeBuilder, + T: PrimitiveElement + Ord, for<'a> T: Scalar = T>, { let result = execute_median(column_name, table_name, engine.clone()) @@ -324,7 +324,7 @@ async fn test_median_failed( engine: Arc, ) -> Result<()> where - T: Primitive + DataTypeBuilder, + T: PrimitiveElement, { let result = execute_median(column_name, table_name, engine).await; assert!(result.is_err()); diff --git a/src/query/tests/scipy_stats_norm_cdf_test.rs b/src/query/tests/scipy_stats_norm_cdf_test.rs index dde430c23a..b5baf6c31f 100644 --- a/src/query/tests/scipy_stats_norm_cdf_test.rs +++ b/src/query/tests/scipy_stats_norm_cdf_test.rs @@ -7,7 +7,7 @@ use datafusion::field_util::FieldExt; use datafusion::field_util::SchemaExt; use datatypes::for_all_primitive_types; use datatypes::prelude::*; -use datatypes::types::DataTypeBuilder; +use datatypes::types::PrimitiveElement; use function::{create_query_engine, get_numbers_from_table}; use num_traits::AsPrimitive; use query::error::Result; @@ -39,7 +39,7 @@ async fn test_scipy_stats_norm_cdf_success( engine: Arc, ) -> Result<()> where - T: Primitive + AsPrimitive + DataTypeBuilder, + T: PrimitiveElement + AsPrimitive, for<'a> T: Scalar = T>, { let result = execute_scipy_stats_norm_cdf(column_name, table_name, engine.clone()) diff --git a/src/query/tests/scipy_stats_norm_pdf.rs b/src/query/tests/scipy_stats_norm_pdf.rs index ad6fdcc7af..989bb87356 100644 --- a/src/query/tests/scipy_stats_norm_pdf.rs +++ b/src/query/tests/scipy_stats_norm_pdf.rs @@ -7,7 +7,7 @@ use datafusion::field_util::FieldExt; use datafusion::field_util::SchemaExt; use datatypes::for_all_primitive_types; use datatypes::prelude::*; -use datatypes::types::DataTypeBuilder; +use datatypes::types::PrimitiveElement; use function::{create_query_engine, get_numbers_from_table}; use num_traits::AsPrimitive; use query::error::Result; @@ -39,7 +39,7 @@ async fn test_scipy_stats_norm_pdf_success( engine: Arc, ) -> Result<()> where - T: Primitive + AsPrimitive + DataTypeBuilder, + T: PrimitiveElement + AsPrimitive, for<'a> T: Scalar = T>, { let result = execute_scipy_stats_norm_pdf(column_name, table_name, engine.clone()) diff --git a/src/script/src/python/coprocessor.rs b/src/script/src/python/coprocessor.rs index 7107e51277..87721b7710 100644 --- a/src/script/src/python/coprocessor.rs +++ b/src/script/src/python/coprocessor.rs @@ -296,9 +296,7 @@ fn create_located(node: T, loc: Location) -> Located { } /// cast a `dyn Array` of type unsigned/int/float into a `dyn Vector` -fn try_into_vector( - arg: Arc, -) -> Result> { +fn try_into_vector(arg: Arc) -> Result> { // wrap try_into_vector in here to convert `datatypes::error::Error` to `python::error::Error` Helper::try_into_vector(arg).context(TypeCastSnafu) }