diff --git a/src/common/time/src/lib.rs b/src/common/time/src/lib.rs index 3b2efc4af2..44c907e7cb 100644 --- a/src/common/time/src/lib.rs +++ b/src/common/time/src/lib.rs @@ -6,5 +6,8 @@ pub mod timestamp; pub mod timestamp_millis; pub mod util; +pub use date::Date; +pub use datetime::DateTime; pub use range::RangeMillis; +pub use timestamp::Timestamp; pub use timestamp_millis::TimestampMillis; diff --git a/src/datatypes/src/error.rs b/src/datatypes/src/error.rs index d67cfcf9a5..efa105a312 100644 --- a/src/datatypes/src/error.rs +++ b/src/datatypes/src/error.rs @@ -50,6 +50,12 @@ pub enum Error { #[snafu(display("{}", msg))] CastType { msg: String, backtrace: Backtrace }, + + #[snafu(display("Arrow failed to compute, source: {}", source))] + ArrowCompute { + source: arrow::error::ArrowError, + backtrace: Backtrace, + }, } impl ErrorExt for Error { diff --git a/src/datatypes/src/scalar.rs b/src/datatypes/src/scalar.rs deleted file mode 100644 index 8b13789179..0000000000 --- a/src/datatypes/src/scalar.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/src/datatypes/src/scalars.rs b/src/datatypes/src/scalars.rs index b9463b8bcd..53b105434b 100644 --- a/src/datatypes/src/scalars.rs +++ b/src/datatypes/src/scalars.rs @@ -1,14 +1,11 @@ use std::any::Any; -use common_time::timestamp::Timestamp; +use common_time::{Date, DateTime, Timestamp}; use crate::prelude::*; -use crate::vectors::date::DateVector; -use crate::vectors::datetime::DateTimeVector; +use crate::value::{ListValue, ListValueRef}; use crate::vectors::*; -pub mod common; - fn get_iter_capacity>(iter: &I) -> usize { match iter.size_hint() { (_lower, Some(upper)) => upper, @@ -244,9 +241,9 @@ impl<'a> ScalarRef<'a> for &'a [u8] { } } -impl Scalar for common_time::date::Date { +impl Scalar for Date { type VectorType = DateVector; - type RefType<'a> = common_time::date::Date; + type RefType<'a> = Date; fn as_scalar_ref(&self) -> Self::RefType<'_> { *self @@ -257,18 +254,18 @@ impl Scalar for common_time::date::Date { } } -impl<'a> ScalarRef<'a> for common_time::date::Date { +impl<'a> ScalarRef<'a> for Date { type VectorType = DateVector; - type ScalarType = common_time::date::Date; + type ScalarType = Date; fn to_owned_scalar(&self) -> Self::ScalarType { *self } } -impl Scalar for common_time::datetime::DateTime { +impl Scalar for DateTime { type VectorType = DateTimeVector; - type RefType<'a> = common_time::datetime::DateTime; + type RefType<'a> = DateTime; fn as_scalar_ref(&self) -> Self::RefType<'_> { *self @@ -279,9 +276,9 @@ impl Scalar for common_time::datetime::DateTime { } } -impl<'a> ScalarRef<'a> for common_time::datetime::DateTime { +impl<'a> ScalarRef<'a> for DateTime { type VectorType = DateTimeVector; - type ScalarType = common_time::datetime::DateTime; + type ScalarType = DateTime; fn to_owned_scalar(&self) -> Self::ScalarType { *self @@ -310,10 +307,41 @@ impl<'a> ScalarRef<'a> for Timestamp { } } +impl Scalar for ListValue { + type VectorType = ListVector; + type RefType<'a> = ListValueRef<'a>; + + fn as_scalar_ref(&self) -> Self::RefType<'_> { + ListValueRef::Ref { val: self } + } + + fn upcast_gat<'short, 'long: 'short>(long: Self::RefType<'long>) -> Self::RefType<'short> { + long + } +} + +impl<'a> ScalarRef<'a> for ListValueRef<'a> { + type VectorType = ListVector; + type ScalarType = ListValue; + + fn to_owned_scalar(&self) -> Self::ScalarType { + match self { + ListValueRef::Indexed { vector, idx } => match vector.get(*idx) { + // Normally should not get `Value::Null` if the `ListValueRef` comes + // from the iterator of the ListVector, but we avoid panic and just + // returns a default list value in such case since `ListValueRef` may + // be constructed manually. + Value::Null => ListValue::default(), + Value::List(v) => v, + _ => unreachable!(), + }, + ListValueRef::Ref { val } => (*val).clone(), + } + } +} + #[cfg(test)] mod tests { - use common_time::date::Date; - use super::*; use crate::vectors::binary::BinaryVector; use crate::vectors::primitive::Int32Vector; @@ -357,7 +385,7 @@ mod tests { } #[test] - pub fn test_build_date_vector() { + fn test_build_date_vector() { let expect: Vec> = vec![ Some(Date::new(0)), Some(Date::new(-1)), @@ -369,14 +397,49 @@ mod tests { } #[test] - pub fn test_date_scalar() { + fn test_date_scalar() { let date = Date::new(1); assert_eq!(date, date.as_scalar_ref()); assert_eq!(date, date.to_owned_scalar()); } #[test] - pub fn test_build_timestamp_vector() { + fn test_datetime_scalar() { + let dt = DateTime::new(123); + assert_eq!(dt, dt.as_scalar_ref()); + assert_eq!(dt, dt.to_owned_scalar()); + } + + #[test] + fn test_list_value_scalar() { + let list_value = ListValue::new( + Some(Box::new(vec![Value::Int32(123)])), + ConcreteDataType::int32_datatype(), + ); + let list_ref = ListValueRef::Ref { val: &list_value }; + assert_eq!(list_ref, list_value.as_scalar_ref()); + assert_eq!(list_value, list_ref.to_owned_scalar()); + + let mut builder = + ListVectorBuilder::with_type_capacity(ConcreteDataType::int32_datatype(), 1); + builder.push(None); + builder.push(Some(list_value.as_scalar_ref())); + let vector = builder.finish(); + + let ref_on_vec = ListValueRef::Indexed { + vector: &vector, + idx: 0, + }; + assert_eq!(ListValue::default(), ref_on_vec.to_owned_scalar()); + let ref_on_vec = ListValueRef::Indexed { + vector: &vector, + idx: 1, + }; + assert_eq!(list_value, ref_on_vec.to_owned_scalar()); + } + + #[test] + fn test_build_timestamp_vector() { let expect: Vec> = vec![Some(10.into()), None, Some(42.into())]; let vector: TimestampVector = build_vector_from_slice(&expect); assert_vector_eq(&expect, &vector); diff --git a/src/datatypes/src/scalars/common.rs b/src/datatypes/src/scalars/common.rs deleted file mode 100644 index 29e6b90517..0000000000 --- a/src/datatypes/src/scalars/common.rs +++ /dev/null @@ -1,23 +0,0 @@ -use crate::prelude::*; - -pub fn replicate_scalar_vector(c: &C, offsets: &[usize]) -> VectorRef { - debug_assert!( - offsets.len() == c.len(), - "Size of offsets must match size of vector" - ); - - if offsets.is_empty() { - return c.slice(0, 0); - } - let mut builder = <::Builder>::with_capacity(c.len()); - - let mut previous_offset = 0; - for (i, offset) in offsets.iter().enumerate() { - let data = c.get_data(i); - for _ in previous_offset..*offset { - builder.push(data); - } - previous_offset = *offset; - } - builder.to_vector() -} diff --git a/src/datatypes/src/type_id.rs b/src/datatypes/src/type_id.rs index 148e3e9995..0f96bbc083 100644 --- a/src/datatypes/src/type_id.rs +++ b/src/datatypes/src/type_id.rs @@ -1,6 +1,3 @@ -#[cfg(any(test, feature = "test"))] -use crate::data_type::ConcreteDataType; - /// Unique identifier for logical data type. #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum LogicalTypeId { @@ -43,7 +40,9 @@ impl LogicalTypeId { /// # Panics /// Panics if data type is not supported. #[cfg(any(test, feature = "test"))] - pub fn data_type(&self) -> ConcreteDataType { + pub fn data_type(&self) -> crate::data_type::ConcreteDataType { + use crate::data_type::ConcreteDataType; + match self { LogicalTypeId::Null => ConcreteDataType::null_datatype(), LogicalTypeId::Boolean => ConcreteDataType::boolean_datatype(), diff --git a/src/datatypes/src/types/list_type.rs b/src/datatypes/src/types/list_type.rs index eccc49c6d5..bb95b3fc10 100644 --- a/src/datatypes/src/types/list_type.rs +++ b/src/datatypes/src/types/list_type.rs @@ -45,7 +45,7 @@ impl DataType for ListType { } fn create_mutable_vector(&self, capacity: usize) -> Box { - Box::new(ListVectorBuilder::with_capacity( + Box::new(ListVectorBuilder::with_type_capacity( *self.inner.clone(), capacity, )) diff --git a/src/datatypes/src/types/primitive_type.rs b/src/datatypes/src/types/primitive_type.rs index afce150fd8..ad2d59773d 100644 --- a/src/datatypes/src/types/primitive_type.rs +++ b/src/datatypes/src/types/primitive_type.rs @@ -10,6 +10,7 @@ use snafu::OptionExt; use crate::data_type::{ConcreteDataType, DataType}; use crate::error::{self, Result}; use crate::scalars::ScalarVectorBuilder; +use crate::scalars::{Scalar, ScalarRef}; use crate::type_id::LogicalTypeId; use crate::types::primitive_traits::Primitive; use crate::value::{Value, ValueRef}; @@ -30,7 +31,13 @@ impl PartialEq> for PrimitiveType Eq for PrimitiveType {} /// A trait that provide helper methods for a primitive type to implementing the [PrimitiveVector]. -pub trait PrimitiveElement: Primitive { +pub trait PrimitiveElement +where + for<'a> Self: Primitive + + Scalar> + + ScalarRef<'a, ScalarType = Self, VectorType = PrimitiveVector> + + Scalar = Self>, +{ /// Construct the data type struct. fn build_data_type() -> ConcreteDataType; diff --git a/src/datatypes/src/value.rs b/src/datatypes/src/value.rs index c2aa38dd56..3c99a88a2b 100644 --- a/src/datatypes/src/value.rs +++ b/src/datatypes/src/value.rs @@ -110,7 +110,7 @@ impl Value { Value::Binary(v) => ValueRef::Binary(v), Value::Date(v) => ValueRef::Date(*v), Value::DateTime(v) => ValueRef::DateTime(*v), - Value::List(v) => ValueRef::List(ListValueRef::Ref(v)), + Value::List(v) => ValueRef::List(ListValueRef::Ref { val: v }), Value::Timestamp(v) => ValueRef::Timestamp(*v), } } @@ -282,6 +282,12 @@ impl ListValue { } } +impl Default for ListValue { + fn default() -> ListValue { + ListValue::new(None, ConcreteDataType::null_datatype()) + } +} + impl PartialOrd for ListValue { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) @@ -464,19 +470,32 @@ impl<'a> From<&'a [u8]> for ValueRef<'a> { } } +impl<'a> From>> for ValueRef<'a> { + fn from(list: Option) -> ValueRef { + match list { + Some(v) => ValueRef::List(v), + None => ValueRef::Null, + } + } +} + /// Reference to a [ListValue]. -// Comparison still requires some allocation (call of `to_value()`) and might be avoidable. +/// +/// Now comparison still requires some allocation (call of `to_value()`) and +/// might be avoidable by downcasting and comparing the underlying array slice +/// if it becomes bottleneck. #[derive(Debug, Clone, Copy)] pub enum ListValueRef<'a> { Indexed { vector: &'a ListVector, idx: usize }, - Ref(&'a ListValue), + Ref { val: &'a ListValue }, } impl<'a> ListValueRef<'a> { + /// Convert self to [Value]. This method would clone the underlying data. fn to_value(self) -> Value { match self { ListValueRef::Indexed { vector, idx } => vector.get(idx), - ListValueRef::Ref(v) => Value::List((*v).clone()), + ListValueRef::Ref { val } => Value::List(val.clone()), } } } @@ -796,7 +815,7 @@ mod tests { datatype: ConcreteDataType::int32_datatype(), }; assert_eq!( - ValueRef::List(ListValueRef::Ref(&list)), + ValueRef::List(ListValueRef::Ref { val: &list }), Value::List(list.clone()).as_value_ref() ); } @@ -831,7 +850,7 @@ mod tests { items: None, datatype: ConcreteDataType::int32_datatype(), }; - check_as_correct!(ListValueRef::Ref(&list), List, as_list); + check_as_correct!(ListValueRef::Ref { val: &list }, List, as_list); let wrong_value = ValueRef::Int32(12345); assert!(wrong_value.as_binary().is_err()); diff --git a/src/datatypes/src/vectors.rs b/src/datatypes/src/vectors.rs index 1d9a45585e..6ba9ac841f 100644 --- a/src/datatypes/src/vectors.rs +++ b/src/datatypes/src/vectors.rs @@ -9,10 +9,21 @@ mod helper; mod list; pub mod mutable; pub mod null; +mod operations; pub mod primitive; mod string; mod timestamp; +pub mod all { + //! All vector types. + pub use crate::vectors::{ + BinaryVector, BooleanVector, ConstantVector, DateTimeVector, DateVector, Float32Vector, + Float64Vector, Int16Vector, Int32Vector, Int64Vector, Int8Vector, ListVector, NullVector, + PrimitiveVector, StringVector, TimestampVector, UInt16Vector, UInt32Vector, UInt64Vector, + UInt8Vector, + }; +} + use std::any::Any; use std::fmt::Debug; use std::sync::Arc; @@ -29,6 +40,7 @@ pub use helper::Helper; pub use list::*; pub use mutable::MutableVector; pub use null::*; +pub use operations::VectorOp; pub use primitive::*; use snafu::ensure; pub use string::*; @@ -59,7 +71,7 @@ impl<'a> Validity<'a> { } /// Vector of data values. -pub trait Vector: Send + Sync + Serializable + Debug { +pub trait Vector: Send + Sync + Serializable + Debug + VectorOp { /// Returns the data type of the vector. /// /// This may require heap allocation. @@ -140,10 +152,6 @@ pub trait Vector: Send + Sync + Serializable + Debug { Ok(self.get(index)) } - // Copies each element according offsets parameter. - // (i-th element should be copied offsets[i] - offsets[i - 1] times.) - fn replicate(&self, offsets: &[usize]) -> VectorRef; - /// Returns the reference of value at `index`. /// /// # Panics diff --git a/src/datatypes/src/vectors/binary.rs b/src/datatypes/src/vectors/binary.rs index cd4a09e405..d4332976e5 100644 --- a/src/datatypes/src/vectors/binary.rs +++ b/src/datatypes/src/vectors/binary.rs @@ -9,7 +9,7 @@ use snafu::{OptionExt, ResultExt}; use crate::arrow_array::{BinaryArray, MutableBinaryArray}; use crate::data_type::ConcreteDataType; use crate::error::{self, Result}; -use crate::scalars::{common, ScalarVector, ScalarVectorBuilder}; +use crate::scalars::{ScalarVector, ScalarVectorBuilder}; use crate::serialize::Serializable; use crate::value::{Value, ValueRef}; use crate::vectors::{self, MutableVector, Validity, Vector, VectorRef}; @@ -20,6 +20,12 @@ pub struct BinaryVector { array: BinaryArray, } +impl BinaryVector { + pub(crate) fn as_arrow(&self) -> &dyn Array { + &self.array + } +} + impl From for BinaryVector { fn from(array: BinaryArray) -> Self { Self { array } @@ -79,10 +85,6 @@ impl Vector for BinaryVector { vectors::impl_get_for_vector!(self.array, index) } - fn replicate(&self, offsets: &[usize]) -> VectorRef { - common::replicate_scalar_vector(self, offsets) - } - fn get_ref(&self, index: usize) -> ValueRef { vectors::impl_get_ref_for_vector!(self.array, index) } diff --git a/src/datatypes/src/vectors/boolean.rs b/src/datatypes/src/vectors/boolean.rs index 3cc682d6dc..d28a825ee9 100644 --- a/src/datatypes/src/vectors/boolean.rs +++ b/src/datatypes/src/vectors/boolean.rs @@ -8,7 +8,6 @@ use snafu::{OptionExt, ResultExt}; use crate::data_type::ConcreteDataType; use crate::error::Result; -use crate::scalars::common::replicate_scalar_vector; use crate::scalars::{ScalarVector, ScalarVectorBuilder}; use crate::serialize::Serializable; use crate::value::{Value, ValueRef}; @@ -20,6 +19,16 @@ pub struct BooleanVector { array: BooleanArray, } +impl BooleanVector { + pub(crate) fn as_arrow(&self) -> &dyn Array { + &self.array + } + + pub(crate) fn as_boolean_array(&self) -> &BooleanArray { + &self.array + } +} + impl From> for BooleanVector { fn from(data: Vec) -> Self { BooleanVector { @@ -95,10 +104,6 @@ impl Vector for BooleanVector { vectors::impl_get_for_vector!(self.array, index) } - fn replicate(&self, offsets: &[usize]) -> VectorRef { - replicate_scalar_vector(self, offsets) - } - fn get_ref(&self, index: usize) -> ValueRef { vectors::impl_get_ref_for_vector!(self.array, index) } diff --git a/src/datatypes/src/vectors/constant.rs b/src/datatypes/src/vectors/constant.rs index fa8cdb02af..dcbd8b87b5 100644 --- a/src/datatypes/src/vectors/constant.rs +++ b/src/datatypes/src/vectors/constant.rs @@ -10,7 +10,7 @@ use crate::error::{Result, SerializeSnafu}; use crate::serialize::Serializable; use crate::value::{Value, ValueRef}; use crate::vectors::Helper; -use crate::vectors::{Validity, Vector, VectorRef}; +use crate::vectors::{BooleanVector, Validity, Vector, VectorRef}; #[derive(Clone)] pub struct ConstantVector { @@ -19,7 +19,13 @@ pub struct ConstantVector { } impl ConstantVector { + /// Create a new [ConstantVector]. + /// + /// # Panics + /// Panics if `vector.len() != 1`. pub fn new(vector: VectorRef, length: usize) -> Self { + assert_eq!(1, vector.len()); + // Avoid const recursion. if vector.is_const() { let vec: &ConstantVector = unsafe { Helper::static_cast(&vector) }; @@ -31,6 +37,11 @@ impl ConstantVector { pub fn inner(&self) -> &VectorRef { &self.vector } + + /// Returns the constant value. + pub fn get_constant_ref(&self) -> ValueRef { + self.vector.get_ref(0) + } } impl Vector for ConstantVector { @@ -95,15 +106,6 @@ impl Vector for ConstantVector { self.vector.get(0) } - fn replicate(&self, offsets: &[usize]) -> VectorRef { - debug_assert!( - offsets.len() == self.len(), - "Size of offsets must match size of column" - ); - - Arc::new(Self::new(self.vector.clone(), *offsets.last().unwrap())) - } - fn get_ref(&self, _index: usize) -> ValueRef { self.vector.get_ref(0) } @@ -111,18 +113,13 @@ impl Vector for ConstantVector { impl fmt::Debug for ConstantVector { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!( - f, - "ConstantVector([{:?}; {}])", - self.try_get(0).unwrap_or(Value::Null), - self.len() - ) + write!(f, "ConstantVector([{:?}; {}])", self.get(0), self.len()) } } impl Serializable for ConstantVector { fn serialize_to_json(&self) -> Result> { - std::iter::repeat(self.try_get(0)?) + std::iter::repeat(self.get(0)) .take(self.len()) .map(serde_json::Value::try_from) .collect::>() @@ -130,6 +127,33 @@ impl Serializable for ConstantVector { } } +pub(crate) fn replicate_constant(vector: &ConstantVector, offsets: &[usize]) -> VectorRef { + assert_eq!(offsets.len(), vector.len()); + + if offsets.is_empty() { + return vector.slice(0, 0); + } + + Arc::new(ConstantVector::new( + vector.vector.clone(), + *offsets.last().unwrap(), + )) +} + +pub(crate) fn filter_constant( + vector: &ConstantVector, + filter: &BooleanVector, +) -> Result { + let length = filter.len() - filter.as_boolean_array().values().null_count(); + if length == vector.len() { + return Ok(Arc::new(vector.clone())); + } + Ok(Arc::new(ConstantVector::new( + vector.inner().clone(), + length, + ))) +} + #[cfg(test)] mod tests { use arrow::datatypes::DataType as ArrowDataType; diff --git a/src/datatypes/src/vectors/date.rs b/src/datatypes/src/vectors/date.rs index c9f4cb1a56..060cb892a5 100644 --- a/src/datatypes/src/vectors/date.rs +++ b/src/datatypes/src/vectors/date.rs @@ -36,6 +36,10 @@ impl DateVector { .clone(), )) } + + pub(crate) fn as_arrow(&self) -> &dyn Array { + self.array.as_arrow() + } } impl Vector for DateVector { @@ -103,10 +107,6 @@ impl Vector for DateVector { } } - fn replicate(&self, offsets: &[usize]) -> VectorRef { - self.array.replicate(offsets) - } - fn get_ref(&self, index: usize) -> ValueRef { match self.array.get(index) { Value::Int32(v) => ValueRef::Date(Date::new(v)), @@ -236,6 +236,15 @@ impl ScalarVectorBuilder for DateVectorBuilder { } } +pub(crate) fn replicate_date(vector: &DateVector, offsets: &[usize]) -> VectorRef { + let array = crate::vectors::primitive::replicate_primitive_with_type( + &vector.array, + offsets, + vector.data_type(), + ); + Arc::new(DateVector { array }) +} + #[cfg(test)] mod tests { use super::*; @@ -293,4 +302,12 @@ mod tests { ])); assert_eq!(expect, vector); } + + #[test] + fn test_date_from_arrow() { + let vector = DateVector::from_slice(&[Date::new(1), Date::new(2)]); + let arrow = vector.as_arrow().slice(0, vector.len()); + let vector2 = DateVector::try_from_arrow_array(&arrow).unwrap(); + assert_eq!(vector, vector2); + } } diff --git a/src/datatypes/src/vectors/datetime.rs b/src/datatypes/src/vectors/datetime.rs index 2b95a67533..9bd36cc165 100644 --- a/src/datatypes/src/vectors/datetime.rs +++ b/src/datatypes/src/vectors/datetime.rs @@ -37,6 +37,10 @@ impl DateTimeVector { .clone(), )) } + + pub(crate) fn as_arrow(&self) -> &dyn Array { + self.array.as_arrow() + } } impl Vector for DateTimeVector { @@ -104,10 +108,6 @@ impl Vector for DateTimeVector { } } - fn replicate(&self, offsets: &[usize]) -> VectorRef { - self.array.replicate(offsets) - } - fn get_ref(&self, index: usize) -> ValueRef { match self.array.get(index) { Value::Int64(v) => ValueRef::DateTime(DateTime::new(v)), @@ -236,6 +236,15 @@ impl ScalarVector for DateTimeVector { } } +pub(crate) fn replicate_datetime(vector: &DateTimeVector, offsets: &[usize]) -> VectorRef { + let array = crate::vectors::primitive::replicate_primitive_with_type( + &vector.array, + offsets, + vector.data_type(), + ); + Arc::new(DateTimeVector { array }) +} + #[cfg(test)] mod tests { use std::assert_matches::assert_matches; @@ -312,4 +321,12 @@ mod tests { ])); assert_eq!(expect, vector); } + + #[test] + fn test_datetime_from_arrow() { + let vector = DateTimeVector::from_slice(&[DateTime::new(1), DateTime::new(2)]); + let arrow = vector.as_arrow().slice(0, vector.len()); + let vector2 = DateTimeVector::try_from_arrow_array(&arrow).unwrap(); + assert_eq!(vector, vector2); + } } diff --git a/src/datatypes/src/vectors/eq.rs b/src/datatypes/src/vectors/eq.rs index 6afb793e0c..ea979b1c53 100644 --- a/src/datatypes/src/vectors/eq.rs +++ b/src/datatypes/src/vectors/eq.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use crate::data_type::DataType; use crate::vectors::{ BinaryVector, BooleanVector, ConstantVector, DateTimeVector, DateVector, ListVector, - PrimitiveVector, StringVector, Vector, + PrimitiveVector, StringVector, TimestampVector, Vector, }; use crate::with_match_primitive_type_id; @@ -54,6 +54,7 @@ fn equal(lhs: &dyn Vector, rhs: &dyn Vector) -> bool { use crate::data_type::ConcreteDataType::*; + let lhs_type = lhs.data_type(); match lhs.data_type() { Null(_) => true, Boolean(_) => is_vector_eq!(BooleanVector, lhs, rhs), @@ -61,27 +62,32 @@ fn equal(lhs: &dyn Vector, rhs: &dyn Vector) -> bool { String(_) => is_vector_eq!(StringVector, lhs, rhs), Date(_) => is_vector_eq!(DateVector, lhs, rhs), DateTime(_) => is_vector_eq!(DateTimeVector, lhs, rhs), + Timestamp(_) => is_vector_eq!(TimestampVector, lhs, rhs), List(_) => is_vector_eq!(ListVector, lhs, rhs), - other => with_match_primitive_type_id!(other.logical_type_id(), |$T| { - let lhs = lhs.as_any().downcast_ref::>().unwrap(); - let rhs = rhs.as_any().downcast_ref::>().unwrap(); + UInt8(_) | UInt16(_) | UInt32(_) | UInt64(_) | Int8(_) | Int16(_) | Int32(_) | Int64(_) + | Float32(_) | Float64(_) => { + with_match_primitive_type_id!(lhs_type.logical_type_id(), |$T| { + let lhs = lhs.as_any().downcast_ref::>().unwrap(); + let rhs = rhs.as_any().downcast_ref::>().unwrap(); - lhs == rhs - }, - { - unreachable!() - }), + lhs == rhs + }, + { + unreachable!("should not compare {} with {}", lhs.vector_type_name(), rhs.vector_type_name()) + }) + } } } #[cfg(test)] mod tests { - use arrow::array::{Int64Array, ListArray, MutableListArray, MutablePrimitiveArray, TryExtend}; + use arrow::array::{ListArray, MutableListArray, MutablePrimitiveArray, TryExtend}; use super::*; use crate::vectors::{ Float32Vector, Float64Vector, Int16Vector, Int32Vector, Int64Vector, Int8Vector, - NullVector, UInt16Vector, UInt32Vector, UInt64Vector, UInt8Vector, VectorRef, + NullVector, TimestampVector, UInt16Vector, UInt32Vector, UInt64Vector, UInt8Vector, + VectorRef, }; fn assert_vector_ref_eq(vector: VectorRef) { @@ -111,10 +117,8 @@ mod tests { ))); assert_vector_ref_eq(Arc::new(BooleanVector::from(vec![true, false]))); assert_vector_ref_eq(Arc::new(DateVector::from(vec![Some(100), Some(120)]))); - assert_vector_ref_eq(Arc::new(DateTimeVector::new(Int64Array::from(vec![ - Some(100), - Some(120), - ])))); + assert_vector_ref_eq(Arc::new(DateTimeVector::from(vec![Some(100), Some(120)]))); + assert_vector_ref_eq(Arc::new(TimestampVector::from_values([100, 120]))); let mut arrow_array = MutableListArray::>::new(); arrow_array @@ -171,7 +175,7 @@ mod tests { 5, )), Arc::new(ConstantVector::new( - Arc::new(BooleanVector::from(vec![true, false])), + Arc::new(BooleanVector::from(vec![false])), 4, )), ); @@ -181,7 +185,7 @@ mod tests { 5, )), Arc::new(ConstantVector::new( - Arc::new(Int32Vector::from_slice(vec![1, 2])), + Arc::new(Int32Vector::from_slice(vec![1])), 4, )), ); diff --git a/src/datatypes/src/vectors/list.rs b/src/datatypes/src/vectors/list.rs index 6269f550a9..d5bb24bcfc 100644 --- a/src/datatypes/src/vectors/list.rs +++ b/src/datatypes/src/vectors/list.rs @@ -1,7 +1,9 @@ use std::any::Any; +use std::ops::Range; use std::sync::Arc; use arrow::array::{Array, ArrayRef, ListArray}; +use arrow::bitmap::utils::ZipValidity; use arrow::bitmap::MutableBitmap; use arrow::datatypes::DataType as ArrowDataType; use serde_json::Value as JsonValue; @@ -24,9 +26,17 @@ pub struct ListVector { } impl ListVector { + /// Only iterate values in the [ListVector]. + /// + /// Be careful to use this method as it would ignore validity and replace null + /// by empty vector. pub fn values_iter(&self) -> Box> + '_> { Box::new(self.array.values_iter().map(VectorHelper::try_into_vector)) } + + pub(crate) fn as_arrow(&self) -> &dyn Array { + &self.array + } } impl Vector for ListVector { @@ -93,13 +103,6 @@ impl Vector for ListVector { )) } - fn replicate(&self, _: &[usize]) -> VectorRef { - // ListVector can be a scalar vector for implementing this `replicate` method. However, - // that requires a lot of efforts, starting from not using Arrow's ListArray. - // Refer to Databend's `ArrayColumn` for more details. - unimplemented!() - } - fn get_ref(&self, index: usize) -> ValueRef { ValueRef::List(ListValueRef::Indexed { vector: self, @@ -137,6 +140,70 @@ impl From for ListVector { impl_try_from_arrow_array_for_vector!(ArrowListArray, ListVector); +pub struct ListVectorIter<'a> { + vector: &'a ListVector, + iter: ZipValidity<'a, usize, Range>, +} + +impl<'a> ListVectorIter<'a> { + pub fn new(vector: &'a ListVector) -> ListVectorIter<'a> { + let iter = ZipValidity::new( + 0..vector.len(), + vector.array.validity().as_ref().map(|x| x.iter()), + ); + + Self { vector, iter } + } +} + +impl<'a> Iterator for ListVectorIter<'a> { + type Item = Option>; + + #[inline] + fn next(&mut self) -> Option { + self.iter.next().map(|idx_opt| { + idx_opt.map(|idx| ListValueRef::Indexed { + vector: self.vector, + idx, + }) + }) + } + + #[inline] + fn size_hint(&self) -> (usize, Option) { + self.iter.size_hint() + } + + #[inline] + fn nth(&mut self, n: usize) -> Option { + self.iter.nth(n).map(|idx_opt| { + idx_opt.map(|idx| ListValueRef::Indexed { + vector: self.vector, + idx, + }) + }) + } +} + +impl ScalarVector for ListVector { + type OwnedItem = ListValue; + type RefItem<'a> = ListValueRef<'a>; + type Iter<'a> = ListVectorIter<'a>; + type Builder = ListVectorBuilder; + + fn get_data(&self, idx: usize) -> Option> { + if self.array.is_valid(idx) { + Some(ListValueRef::Indexed { vector: self, idx }) + } else { + None + } + } + + fn iter_data(&self) -> Self::Iter<'_> { + ListVectorIter::new(self) + } +} + // Some codes are ported from arrow2's MutableListArray. pub struct ListVectorBuilder { inner_type: ConcreteDataType, @@ -146,7 +213,7 @@ pub struct ListVectorBuilder { } impl ListVectorBuilder { - pub fn with_capacity(inner_type: ConcreteDataType, capacity: usize) -> ListVectorBuilder { + pub fn with_type_capacity(inner_type: ConcreteDataType, capacity: usize) -> ListVectorBuilder { let mut offsets = Vec::with_capacity(capacity + 1); offsets.push(0); // The actual required capacity might greater than the capacity of the `ListVector` @@ -224,19 +291,7 @@ impl MutableVector for ListVectorBuilder { } fn to_vector(&mut self) -> VectorRef { - let array = ArrowListArray::try_new( - ConcreteDataType::list_datatype(self.inner_type.clone()).as_arrow_type(), - std::mem::take(&mut self.offsets).into(), - self.values.to_vector().to_arrow_array(), - std::mem::take(&mut self.validity).map(|x| x.into()), - ) - .unwrap(); // The `ListVectorBuilder` itself should ensure it always builds a valid array. - - let vector = ListVector { - array, - inner_datatype: self.inner_type.clone(), - }; - Arc::new(vector) + Arc::new(self.finish()) } fn push_value_ref(&mut self, value: ValueRef) -> Result<()> { @@ -246,7 +301,7 @@ impl MutableVector for ListVectorBuilder { Some(list_value) => self.push_list_value(list_value)?, None => self.push_null(), }, - ListValueRef::Ref(list_value) => self.push_list_value(list_value)?, + ListValueRef::Ref { val } => self.push_list_value(val)?, } } else { self.push_null(); @@ -265,6 +320,41 @@ impl MutableVector for ListVectorBuilder { } } +impl ScalarVectorBuilder for ListVectorBuilder { + type VectorType = ListVector; + + fn with_capacity(_capacity: usize) -> Self { + panic!("Must use ListVectorBuilder::with_type_capacity()"); + } + + fn push(&mut self, value: Option<::RefItem<'_>>) { + // We expect the input ListValue has the same inner type as the builder when using + // push(), so just panic if `push_value_ref()` returns error, which indicate an + // invalid input value type. + self.push_value_ref(value.into()).unwrap_or_else(|e| { + panic!( + "Failed to push value, expect value type {:?}, err:{}", + self.inner_type, e + ); + }); + } + + fn finish(&mut self) -> Self::VectorType { + let array = ArrowListArray::try_new( + ConcreteDataType::list_datatype(self.inner_type.clone()).as_arrow_type(), + std::mem::take(&mut self.offsets).into(), + self.values.to_vector().to_arrow_array(), + std::mem::take(&mut self.validity).map(|x| x.into()), + ) + .unwrap(); // The `ListVectorBuilder` itself should ensure it always builds a valid array. + + ListVector { + array, + inner_datatype: self.inner_type.clone(), + } + } +} + #[cfg(test)] mod tests { use arrow::array::{MutableListArray, MutablePrimitiveArray, TryExtend}; @@ -445,14 +535,16 @@ mod tests { let mut builder = ListType::new(ConcreteDataType::int32_datatype()).create_mutable_vector(3); builder - .push_value_ref(ValueRef::List(ListValueRef::Ref(&ListValue::new( - Some(Box::new(vec![ - Value::Int32(4), - Value::Null, - Value::Int32(6), - ])), - ConcreteDataType::int32_datatype(), - )))) + .push_value_ref(ValueRef::List(ListValueRef::Ref { + val: &ListValue::new( + Some(Box::new(vec![ + Value::Int32(4), + Value::Null, + Value::Int32(6), + ])), + ConcreteDataType::int32_datatype(), + ), + })) .unwrap(); assert!(builder.push_value_ref(ValueRef::Int32(123)).is_err()); @@ -475,4 +567,59 @@ mod tests { ])); assert_eq!(expect, vector); } + + #[test] + fn test_list_vector_for_scalar() { + let mut builder = + ListVectorBuilder::with_type_capacity(ConcreteDataType::int32_datatype(), 2); + builder.push(None); + builder.push(Some(ListValueRef::Ref { + val: &ListValue::new( + Some(Box::new(vec![ + Value::Int32(4), + Value::Null, + Value::Int32(6), + ])), + ConcreteDataType::int32_datatype(), + ), + })); + let vector = builder.finish(); + + let expect = new_list_vector(vec![None, Some(vec![Some(4), None, Some(6)])]); + assert_eq!(expect, vector); + + assert!(vector.get_data(0).is_none()); + assert_eq!( + ListValueRef::Indexed { + vector: &vector, + idx: 1 + }, + vector.get_data(1).unwrap() + ); + assert_eq!( + *vector.get(1).as_list().unwrap().unwrap(), + vector.get_data(1).unwrap().to_owned_scalar() + ); + + let mut iter = vector.iter_data(); + assert!(iter.next().unwrap().is_none()); + assert_eq!( + ListValueRef::Indexed { + vector: &vector, + idx: 1 + }, + iter.next().unwrap().unwrap() + ); + assert!(iter.next().is_none()); + + let mut iter = vector.iter_data(); + assert_eq!(2, iter.size_hint().0); + assert_eq!( + ListValueRef::Indexed { + vector: &vector, + idx: 1 + }, + iter.nth(1).unwrap().unwrap() + ); + } } diff --git a/src/datatypes/src/vectors/null.rs b/src/datatypes/src/vectors/null.rs index 2f483b0ea5..329210886e 100644 --- a/src/datatypes/src/vectors/null.rs +++ b/src/datatypes/src/vectors/null.rs @@ -25,6 +25,10 @@ impl NullVector { array: NullArray::new(ArrowDataType::Null, n), } } + + pub(crate) fn as_arrow(&self) -> &dyn Array { + &self.array + } } impl From for NullVector { @@ -83,17 +87,6 @@ impl Vector for NullVector { Value::Null } - fn replicate(&self, offsets: &[usize]) -> VectorRef { - debug_assert!( - offsets.len() == self.len(), - "Size of offsets must match size of column" - ); - - Arc::new(Self { - array: NullArray::new(ArrowDataType::Null, *offsets.last().unwrap() as usize), - }) - } - fn get_ref(&self, _index: usize) -> ValueRef { // Skips bound check for null array. ValueRef::Null @@ -179,6 +172,12 @@ impl MutableVector for NullVectorBuilder { } } +pub(crate) fn replicate_null(vector: &NullVector, offsets: &[usize]) -> VectorRef { + assert_eq!(offsets.len(), vector.len()); + + Arc::new(NullVector::new(*offsets.last().unwrap())) +} + #[cfg(test)] mod tests { use serde_json; diff --git a/src/datatypes/src/vectors/operations.rs b/src/datatypes/src/vectors/operations.rs new file mode 100644 index 0000000000..ede948ca02 --- /dev/null +++ b/src/datatypes/src/vectors/operations.rs @@ -0,0 +1,121 @@ +mod dedup; +mod filter; +mod replicate; + +use arrow::bitmap::MutableBitmap; + +use crate::error::Result; +use crate::types::PrimitiveElement; +use crate::vectors::all::*; +use crate::vectors::{Vector, VectorRef}; + +/// Vector compute operations. +pub trait VectorOp { + /// Copies each element according `offsets` parameter. + /// (`i-th` element should be copied `offsets[i] - offsets[i - 1]` times.) + /// + /// # Panics + /// Panics if `offsets.len() != self.len()`. + fn replicate(&self, offsets: &[usize]) -> VectorRef; + + /// Dedup elements in `self` and mark `i-th` bit of `selected` to `true` if the `i-th` element + /// of `self` is retained. + /// + /// The caller should ensure + /// 1. the `selected` bitmap is intialized by setting `[0, vector.len())` + /// bits to false. + /// 2. `vector` and `prev_vector` are sorted. + /// + /// If there are multiple duplicate elements, this function retains the **first** element. + /// If the first element of `self` is equal to the last element of `prev_vector`, then that + /// first element is also considered as duplicated and won't be retained. + /// + /// # Panics + /// Panics if + /// - `selected.len() < self.len()`. + /// - `prev_vector` and `self` have different data types. + fn dedup(&self, selected: &mut MutableBitmap, prev_vector: Option<&dyn Vector>); + + /// Filters the vector, returns elements matching the `filter` (i.e. where the values are true). + /// + /// Note that the nulls of `filter` are interpreted as `false` will lead to these elements being masked out. + fn filter(&self, filter: &BooleanVector) -> Result; +} + +macro_rules! impl_scalar_vector_op { + ($( { $VectorType: ident, $replicate: ident } ),+) => {$( + impl VectorOp for $VectorType { + fn replicate(&self, offsets: &[usize]) -> VectorRef { + replicate::$replicate(self, offsets) + } + + fn dedup(&self, selected: &mut MutableBitmap, prev_vector: Option<&dyn Vector>) { + let prev_vector = prev_vector.map(|pv| pv.as_any().downcast_ref::<$VectorType>().unwrap()); + dedup::dedup_scalar(self, selected, prev_vector); + } + + fn filter(&self, filter: &BooleanVector) -> Result { + filter::filter_non_constant!(self, $VectorType, filter) + } + } + )+}; +} + +impl_scalar_vector_op!( + { BinaryVector, replicate_scalar }, + { BooleanVector, replicate_scalar }, + { ListVector, replicate_scalar }, + { StringVector, replicate_scalar }, + { DateVector, replicate_date }, + { DateTimeVector, replicate_datetime }, + { TimestampVector, replicate_timestamp } +); + +impl VectorOp for ConstantVector { + fn replicate(&self, offsets: &[usize]) -> VectorRef { + replicate::replicate_constant(self, offsets) + } + + fn dedup(&self, selected: &mut MutableBitmap, prev_vector: Option<&dyn Vector>) { + let prev_vector = prev_vector.and_then(|pv| pv.as_any().downcast_ref::()); + dedup::dedup_constant(self, selected, prev_vector); + } + + fn filter(&self, filter: &BooleanVector) -> Result { + filter::filter_constant(self, filter) + } +} + +impl VectorOp for NullVector { + fn replicate(&self, offsets: &[usize]) -> VectorRef { + replicate::replicate_null(self, offsets) + } + + fn dedup(&self, selected: &mut MutableBitmap, prev_vector: Option<&dyn Vector>) { + let prev_vector = prev_vector.and_then(|pv| pv.as_any().downcast_ref::()); + dedup::dedup_null(self, selected, prev_vector); + } + + fn filter(&self, filter: &BooleanVector) -> Result { + filter::filter_non_constant!(self, NullVector, filter) + } +} + +impl VectorOp for PrimitiveVector +where + T: PrimitiveElement, +{ + fn replicate(&self, offsets: &[usize]) -> VectorRef { + replicate::replicate_primitive(self, offsets) + } + + fn dedup(&self, selected: &mut MutableBitmap, prev_vector: Option<&dyn Vector>) { + let prev_vector = + prev_vector.and_then(|pv| pv.as_any().downcast_ref::>()); + dedup::dedup_scalar(self, selected, prev_vector); + } + + fn filter(&self, filter: &BooleanVector) -> Result { + filter::filter_non_constant!(self, PrimitiveVector, filter) + } +} diff --git a/src/datatypes/src/vectors/operations/dedup.rs b/src/datatypes/src/vectors/operations/dedup.rs new file mode 100644 index 0000000000..33ea0dfbb7 --- /dev/null +++ b/src/datatypes/src/vectors/operations/dedup.rs @@ -0,0 +1,223 @@ +use arrow::bitmap::MutableBitmap; + +use crate::scalars::ScalarVector; +use crate::vectors::{ConstantVector, NullVector, Vector}; + +pub(crate) fn dedup_scalar<'a, T: ScalarVector>( + vector: &'a T, + selected: &'a mut MutableBitmap, + prev_vector: Option<&'a T>, +) where + T::RefItem<'a>: PartialEq, +{ + assert!(selected.len() >= vector.len()); + + if vector.is_empty() { + return; + } + + for ((i, current), next) in vector + .iter_data() + .enumerate() + .zip(vector.iter_data().skip(1)) + { + if current != next { + // If next element is a different element, we mark it as selected. + selected.set(i + 1, true); + } + } + + // Always retain the first element. + selected.set(0, true); + + // Then check whether still keep the first element based last element in previous vector. + if let Some(pv) = &prev_vector { + if !pv.is_empty() { + let last = pv.get_data(pv.len() - 1); + if last == vector.get_data(0) { + selected.set(0, false); + } + } + } +} + +pub(crate) fn dedup_null( + vector: &NullVector, + selected: &mut MutableBitmap, + prev_vector: Option<&NullVector>, +) { + if vector.is_empty() { + return; + } + + let no_prev_element = prev_vector.map(|v| v.is_empty()).unwrap_or(true); + if no_prev_element { + // Retain first element if no previous element (we known that it must + // be null). + selected.set(0, true); + } +} + +pub(crate) fn dedup_constant( + vector: &ConstantVector, + selected: &mut MutableBitmap, + prev_vector: Option<&ConstantVector>, +) { + if vector.is_empty() { + return; + } + + let equal_to_prev = if let Some(prev) = prev_vector { + !prev.is_empty() && vector.get_constant_ref() == prev.get_constant_ref() + } else { + false + }; + + if !equal_to_prev { + selected.set(0, true); + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use super::*; + use crate::vectors::{Int32Vector, StringVector, VectorOp}; + + fn check_bitmap(expect: &[bool], selected: &MutableBitmap) { + assert_eq!(expect.len(), selected.len()); + for (exp, v) in expect.iter().zip(selected.iter()) { + assert_eq!(*exp, v); + } + } + + fn check_dedup_scalar(expect: &[bool], input: &[i32], prev: Option<&[i32]>) { + check_dedup_scalar_opt(expect, input.iter().map(|v| Some(*v)), prev); + } + + fn check_dedup_scalar_opt( + expect: &[bool], + input: impl Iterator>, + prev: Option<&[i32]>, + ) { + let input = Int32Vector::from_iter(input); + let prev = prev.map(Int32Vector::from_slice); + + let mut selected = MutableBitmap::from_len_zeroed(input.len()); + input.dedup(&mut selected, prev.as_ref().map(|v| v as _)); + + check_bitmap(expect, &selected); + } + + #[test] + fn test_dedup_scalar() { + check_dedup_scalar(&[], &[], None); + check_dedup_scalar(&[true], &[1], None); + check_dedup_scalar(&[true, false], &[1, 1], None); + check_dedup_scalar(&[true, true], &[1, 2], None); + check_dedup_scalar(&[true, true, true, true], &[1, 2, 3, 4], None); + check_dedup_scalar(&[true, false, true, false], &[1, 1, 3, 3], None); + check_dedup_scalar(&[true, false, false, false, true], &[2, 2, 2, 2, 3], None); + + check_dedup_scalar(&[true], &[5], Some(&[])); + check_dedup_scalar(&[true], &[5], Some(&[3])); + check_dedup_scalar(&[false], &[5], Some(&[5])); + check_dedup_scalar(&[false], &[5], Some(&[4, 5])); + check_dedup_scalar(&[false, true], &[5, 6], Some(&[4, 5])); + check_dedup_scalar(&[false, true, false], &[5, 6, 6], Some(&[4, 5])); + check_dedup_scalar( + &[false, true, false, true, true], + &[5, 6, 6, 7, 8], + Some(&[4, 5]), + ); + + check_dedup_scalar_opt( + &[true, true, false, true, false], + [Some(1), Some(2), Some(2), None, None].into_iter(), + None, + ); + } + + fn check_dedup_null(len: usize) { + let input = NullVector::new(len); + let mut selected = MutableBitmap::from_len_zeroed(input.len()); + input.dedup(&mut selected, None); + + let mut expect = vec![false; len]; + if !expect.is_empty() { + expect[0] = true; + } + check_bitmap(&expect, &selected); + + let mut selected = MutableBitmap::from_len_zeroed(input.len()); + let prev = Some(NullVector::new(1)); + input.dedup(&mut selected, prev.as_ref().map(|v| v as _)); + let expect = vec![false; len]; + check_bitmap(&expect, &selected); + } + + #[test] + fn test_dedup_null() { + for len in 0..5 { + check_dedup_null(len); + } + } + + fn check_dedup_constant(len: usize) { + let input = ConstantVector::new(Arc::new(Int32Vector::from_slice(&[8])), len); + let mut selected = MutableBitmap::from_len_zeroed(len); + input.dedup(&mut selected, None); + + let mut expect = vec![false; len]; + if !expect.is_empty() { + expect[0] = true; + } + check_bitmap(&expect, &selected); + + let mut selected = MutableBitmap::from_len_zeroed(len); + let prev = Some(ConstantVector::new( + Arc::new(Int32Vector::from_slice(&[8])), + 1, + )); + input.dedup(&mut selected, prev.as_ref().map(|v| v as _)); + let expect = vec![false; len]; + check_bitmap(&expect, &selected); + } + + #[test] + fn test_dedup_constant() { + for len in 0..5 { + check_dedup_constant(len); + } + } + + #[test] + fn test_dedup_string() { + let input = StringVector::from_slice(&["a", "a", "b", "c"]); + let mut selected = MutableBitmap::from_len_zeroed(4); + input.dedup(&mut selected, None); + let expect = vec![true, false, true, true]; + check_bitmap(&expect, &selected); + } + + macro_rules! impl_dedup_date_like_test { + ($VectorType: ident, $ValueType: ident, $method: ident) => {{ + use common_time::$ValueType; + use $crate::vectors::$VectorType; + + let v = $VectorType::from_iterator([8, 8, 9, 10].into_iter().map($ValueType::$method)); + let mut selected = MutableBitmap::from_len_zeroed(4); + v.dedup(&mut selected, None); + let expect = vec![true, false, true, true]; + check_bitmap(&expect, &selected); + }}; + } + + #[test] + fn test_dedup_date_like() { + impl_dedup_date_like_test!(DateVector, Date, new); + impl_dedup_date_like_test!(DateTimeVector, DateTime, new); + impl_dedup_date_like_test!(TimestampVector, Timestamp, from_millis); + } +} diff --git a/src/datatypes/src/vectors/operations/filter.rs b/src/datatypes/src/vectors/operations/filter.rs new file mode 100644 index 0000000000..4ae0398047 --- /dev/null +++ b/src/datatypes/src/vectors/operations/filter.rs @@ -0,0 +1,114 @@ +pub(crate) use crate::vectors::constant::filter_constant; + +macro_rules! filter_non_constant { + ($vector: expr, $VectorType: ty, $filter: ident) => {{ + use std::sync::Arc; + + use snafu::ResultExt; + + let arrow_array = $vector.as_arrow(); + let filtered = arrow::compute::filter::filter(arrow_array, $filter.as_boolean_array()) + .context(crate::error::ArrowComputeSnafu)?; + Ok(Arc::new(<$VectorType>::try_from_arrow_array(filtered)?)) + }}; +} + +pub(crate) use filter_non_constant; + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use crate::scalars::ScalarVector; + use crate::vectors::{ + BooleanVector, ConstantVector, Int32Vector, NullVector, StringVector, VectorOp, VectorRef, + }; + + fn check_filter_primitive(expect: &[i32], input: &[i32], filter: &[bool]) { + let v = Int32Vector::from_slice(&input); + let filter = BooleanVector::from_slice(filter); + let out = v.filter(&filter).unwrap(); + + let expect: VectorRef = Arc::new(Int32Vector::from_slice(&expect)); + assert_eq!(expect, out); + } + + #[test] + fn test_filter_primitive() { + check_filter_primitive(&[], &[], &[]); + check_filter_primitive(&[5], &[5], &[true]); + check_filter_primitive(&[], &[5], &[false]); + check_filter_primitive(&[], &[5, 6], &[false, false]); + check_filter_primitive(&[5, 6], &[5, 6], &[true, true]); + check_filter_primitive(&[], &[5, 6, 7], &[false, false, false]); + check_filter_primitive(&[5], &[5, 6, 7], &[true, false, false]); + check_filter_primitive(&[6], &[5, 6, 7], &[false, true, false]); + check_filter_primitive(&[7], &[5, 6, 7], &[false, false, true]); + check_filter_primitive(&[5, 7], &[5, 6, 7], &[true, false, true]); + } + + fn check_filter_constant(expect_length: usize, input_length: usize, filter: &[bool]) { + let v = ConstantVector::new(Arc::new(Int32Vector::from_slice(&[123])), input_length); + let filter = BooleanVector::from_slice(filter); + let out = v.filter(&filter).unwrap(); + + assert!(out.is_const()); + assert_eq!(expect_length, out.len()); + } + + #[test] + fn test_filter_constant() { + check_filter_constant(0, 0, &[]); + check_filter_constant(1, 1, &[true]); + check_filter_constant(0, 1, &[false]); + check_filter_constant(1, 2, &[false, true]); + check_filter_constant(2, 2, &[true, true]); + check_filter_constant(1, 4, &[false, false, false, true]); + check_filter_constant(2, 4, &[false, true, false, true]); + } + + #[test] + fn test_filter_scalar() { + let v = StringVector::from_slice(&["0", "1", "2", "3"]); + let filter = BooleanVector::from_slice(&[false, true, false, true]); + let out = v.filter(&filter).unwrap(); + + let expect: VectorRef = Arc::new(StringVector::from_slice(&["1", "3"])); + assert_eq!(expect, out); + } + + #[test] + fn test_filter_null() { + let v = NullVector::new(5); + let filter = BooleanVector::from_slice(&[false, true, false, true, true]); + let out = v.filter(&filter).unwrap(); + + let expect: VectorRef = Arc::new(NullVector::new(3)); + assert_eq!(expect, out); + } + + macro_rules! impl_filter_date_like_test { + ($VectorType: ident, $ValueType: ident, $method: ident) => {{ + use std::sync::Arc; + + use common_time::$ValueType; + use $crate::vectors::{$VectorType, VectorRef}; + + let v = $VectorType::from_iterator((0..5).map($ValueType::$method)); + let filter = BooleanVector::from_slice(&[false, true, false, true, true]); + let out = v.filter(&filter).unwrap(); + + let expect: VectorRef = Arc::new($VectorType::from_iterator( + [1, 3, 4].into_iter().map($ValueType::$method), + )); + assert_eq!(expect, out); + }}; + } + + #[test] + fn test_filter_date_like() { + impl_filter_date_like_test!(DateVector, Date, new); + impl_filter_date_like_test!(DateTimeVector, DateTime, new); + impl_filter_date_like_test!(TimestampVector, Timestamp, from_millis); + } +} diff --git a/src/datatypes/src/vectors/operations/replicate.rs b/src/datatypes/src/vectors/operations/replicate.rs new file mode 100644 index 0000000000..8ed712fd40 --- /dev/null +++ b/src/datatypes/src/vectors/operations/replicate.rs @@ -0,0 +1,108 @@ +use crate::prelude::*; +pub(crate) use crate::vectors::constant::replicate_constant; +pub(crate) use crate::vectors::date::replicate_date; +pub(crate) use crate::vectors::datetime::replicate_datetime; +pub(crate) use crate::vectors::null::replicate_null; +pub(crate) use crate::vectors::primitive::replicate_primitive; +pub(crate) use crate::vectors::timestamp::replicate_timestamp; + +pub(crate) fn replicate_scalar(c: &C, offsets: &[usize]) -> VectorRef { + assert_eq!(offsets.len(), c.len()); + + if offsets.is_empty() { + return c.slice(0, 0); + } + let mut builder = <::Builder>::with_capacity(c.len()); + + let mut previous_offset = 0; + for (i, offset) in offsets.iter().enumerate() { + let data = c.get_data(i); + for _ in previous_offset..*offset { + builder.push(data); + } + previous_offset = *offset; + } + builder.to_vector() +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use super::*; + use crate::vectors::{ConstantVector, Int32Vector, NullVector, StringVector, VectorOp}; + + #[test] + fn test_replicate_primitive() { + let v = Int32Vector::from_iterator(0..5); + let offsets = [0, 1, 2, 3, 4]; + + let v = v.replicate(&offsets); + assert_eq!(4, v.len()); + + for i in 0..4 { + assert_eq!(Value::Int32(i as i32 + 1), v.get(i)); + } + } + + #[test] + fn test_replicate_scalar() { + let v = StringVector::from_slice(&["0", "1", "2", "3"]); + let offsets = [1, 3, 5, 6]; + + let v = v.replicate(&offsets); + assert_eq!(6, v.len()); + + let expect: VectorRef = Arc::new(StringVector::from_slice(&["0", "1", "1", "2", "2", "3"])); + assert_eq!(expect, v); + } + + #[test] + fn test_replicate_constant() { + let v = Arc::new(StringVector::from_slice(&["hello"])); + let cv = ConstantVector::new(v.clone(), 2); + let offsets = [1, 4]; + + let cv = cv.replicate(&offsets); + assert_eq!(4, cv.len()); + + let expect: VectorRef = Arc::new(ConstantVector::new(v, 4)); + assert_eq!(expect, cv); + } + + #[test] + fn test_replicate_null() { + let v = NullVector::new(3); + let offsets = [1, 3, 5]; + + let v = v.replicate(&offsets); + assert_eq!(5, v.len()); + } + + macro_rules! impl_replicate_date_like_test { + ($VectorType: ident, $ValueType: ident, $method: ident) => {{ + use common_time::$ValueType; + use $crate::vectors::$VectorType; + + let v = $VectorType::from_iterator((0..5).map($ValueType::$method)); + let offsets = [0, 1, 2, 3, 4]; + + let v = v.replicate(&offsets); + assert_eq!(4, v.len()); + + for i in 0..4 { + assert_eq!( + Value::$ValueType($ValueType::$method((i as i32 + 1).into())), + v.get(i) + ); + } + }}; + } + + #[test] + fn test_replicate_date_like() { + impl_replicate_date_like_test!(DateVector, Date, new); + impl_replicate_date_like_test!(DateTimeVector, DateTime, new); + impl_replicate_date_like_test!(TimestampVector, Timestamp, from_millis); + } +} diff --git a/src/datatypes/src/vectors/primitive.rs b/src/datatypes/src/vectors/primitive.rs index 642cee4940..a014c3cb22 100644 --- a/src/datatypes/src/vectors/primitive.rs +++ b/src/datatypes/src/vectors/primitive.rs @@ -8,7 +8,7 @@ use arrow::bitmap::utils::ZipValidity; use serde_json::Value as JsonValue; use snafu::{OptionExt, ResultExt}; -use crate::data_type::ConcreteDataType; +use crate::data_type::{ConcreteDataType, DataType}; use crate::error::ConversionSnafu; use crate::error::{Result, SerializeSnafu}; use crate::scalars::{Scalar, ScalarRef}; @@ -59,6 +59,14 @@ impl PrimitiveVector { array: PrimitiveArray::from_values(iter), } } + + pub(crate) fn as_arrow(&self) -> &dyn Array { + &self.array + } + + fn slice(&self, offset: usize, length: usize) -> Self { + Self::from(self.array.slice(offset, length)) + } } impl Vector for PrimitiveVector { @@ -99,40 +107,13 @@ impl Vector for PrimitiveVector { } fn slice(&self, offset: usize, length: usize) -> VectorRef { - Arc::new(Self::from(self.array.slice(offset, length))) + Arc::new(self.slice(offset, length)) } fn get(&self, index: usize) -> Value { vectors::impl_get_for_vector!(self.array, index) } - fn replicate(&self, offsets: &[usize]) -> VectorRef { - debug_assert!( - offsets.len() == self.len(), - "Size of offsets must match size of column" - ); - - if offsets.is_empty() { - return self.slice(0, 0); - } - - let mut builder = - PrimitiveVectorBuilder::::with_capacity(*offsets.last().unwrap() as usize); - - let mut previous_offset = 0; - - for (i, offset) in offsets.iter().enumerate() { - let data = unsafe { self.array.value_unchecked(i) }; - builder.mutable_array.extend( - std::iter::repeat(data) - .take(*offset - previous_offset) - .map(Option::Some), - ); - previous_offset = *offset; - } - builder.to_vector() - } - fn get_ref(&self, index: usize) -> ValueRef { if self.array.is_valid(index) { // Safety: The index have been checked by `is_valid()`. @@ -167,9 +148,7 @@ impl>> FromIterator for Pr impl ScalarVector for PrimitiveVector where - T: Scalar + PrimitiveElement, - for<'a> T: ScalarRef<'a, ScalarType = T, VectorType = Self>, - for<'a> T: Scalar = T>, + T: PrimitiveElement, { type OwnedItem = T; type RefItem<'a> = T; @@ -216,16 +195,18 @@ impl<'a, T: Copy> Iterator for PrimitiveIter<'a, T> { } } -pub struct PrimitiveVectorBuilder { - pub(crate) mutable_array: MutablePrimitiveArray, +impl Serializable for PrimitiveVector { + fn serialize_to_json(&self) -> Result> { + self.array + .iter() + .map(serde_json::to_value) + .collect::>() + .context(SerializeSnafu) + } } -impl PrimitiveVectorBuilder { - fn with_capacity(capacity: usize) -> Self { - Self { - mutable_array: MutablePrimitiveArray::with_capacity(capacity), - } - } +pub struct PrimitiveVectorBuilder { + pub(crate) mutable_array: MutablePrimitiveArray, } pub type UInt8VectorBuilder = PrimitiveVectorBuilder; @@ -259,9 +240,7 @@ impl MutableVector for PrimitiveVectorBuilder { } fn to_vector(&mut self) -> VectorRef { - Arc::new(PrimitiveVector:: { - array: std::mem::take(&mut self.mutable_array).into(), - }) + Arc::new(self.finish()) } fn push_value_ref(&mut self, value: ValueRef) -> Result<()> { @@ -304,16 +283,58 @@ where } } -impl Serializable for PrimitiveVector { - fn serialize_to_json(&self) -> Result> { - self.array - .iter() - .map(serde_json::to_value) - .collect::>() - .context(SerializeSnafu) +impl PrimitiveVectorBuilder { + fn with_type_capacity(data_type: ConcreteDataType, capacity: usize) -> Self { + Self { + mutable_array: MutablePrimitiveArray::with_capacity_from( + capacity, + data_type.as_arrow_type(), + ), + } } } +pub(crate) fn replicate_primitive( + vector: &PrimitiveVector, + offsets: &[usize], +) -> VectorRef { + Arc::new(replicate_primitive_with_type( + vector, + offsets, + T::build_data_type(), + )) +} + +pub(crate) fn replicate_primitive_with_type( + vector: &PrimitiveVector, + offsets: &[usize], + data_type: ConcreteDataType, +) -> PrimitiveVector { + assert_eq!(offsets.len(), vector.len()); + + if offsets.is_empty() { + return vector.slice(0, 0); + } + + let mut builder = PrimitiveVectorBuilder::::with_type_capacity( + data_type, + *offsets.last().unwrap() as usize, + ); + + let mut previous_offset = 0; + + for (i, offset) in offsets.iter().enumerate() { + let data = unsafe { vector.array.value_unchecked(i) }; + builder.mutable_array.extend( + std::iter::repeat(data) + .take(*offset - previous_offset) + .map(Option::Some), + ); + previous_offset = *offset; + } + builder.finish() +} + #[cfg(test)] mod tests { use arrow::datatypes::DataType as ArrowDataType; @@ -425,20 +446,6 @@ mod tests { assert_eq!(Validity::AllValid, vector.validity()); } - #[test] - fn test_replicate() { - let v = PrimitiveVector::::from_slice((0..5).collect::>()); - - let offsets = [0usize, 1usize, 2usize, 3usize, 4usize]; - - let v = v.replicate(&offsets); - assert_eq!(4, v.len()); - - for i in 0..4 { - assert_eq!(Value::Int32(i as i32 + 1), v.get(i)); - } - } - #[test] fn test_memory_size() { let v = PrimitiveVector::::from_slice((0..5).collect::>()); diff --git a/src/datatypes/src/vectors/string.rs b/src/datatypes/src/vectors/string.rs index 02664d1e26..fc75153581 100644 --- a/src/datatypes/src/vectors/string.rs +++ b/src/datatypes/src/vectors/string.rs @@ -9,7 +9,7 @@ use snafu::{OptionExt, ResultExt}; use crate::arrow_array::{MutableStringArray, StringArray}; use crate::data_type::ConcreteDataType; use crate::error::{Result, SerializeSnafu}; -use crate::scalars::{common, ScalarVector, ScalarVectorBuilder}; +use crate::scalars::{ScalarVector, ScalarVectorBuilder}; use crate::serialize::Serializable; use crate::types::StringType; use crate::value::{Value, ValueRef}; @@ -21,6 +21,12 @@ pub struct StringVector { array: StringArray, } +impl StringVector { + pub(crate) fn as_arrow(&self) -> &dyn Array { + &self.array + } +} + impl From for StringVector { fn from(array: StringArray) -> Self { Self { array } @@ -112,10 +118,6 @@ impl Vector for StringVector { vectors::impl_get_for_vector!(self.array, index) } - fn replicate(&self, offsets: &[usize]) -> VectorRef { - common::replicate_scalar_vector(self, offsets) - } - fn get_ref(&self, index: usize) -> ValueRef { vectors::impl_get_ref_for_vector!(self.array, index) } diff --git a/src/datatypes/src/vectors/timestamp.rs b/src/datatypes/src/vectors/timestamp.rs index 77bf8dea35..0b47dc1f5e 100644 --- a/src/datatypes/src/vectors/timestamp.rs +++ b/src/datatypes/src/vectors/timestamp.rs @@ -48,6 +48,10 @@ impl TimestampVector { }, } } + + pub(crate) fn as_arrow(&self) -> &dyn Array { + self.array.as_arrow() + } } impl Vector for TimestampVector { @@ -117,10 +121,6 @@ impl Vector for TimestampVector { } } - fn replicate(&self, offsets: &[usize]) -> VectorRef { - self.array.replicate(offsets) - } - fn get_ref(&self, index: usize) -> ValueRef { match self.array.get(index) { Value::Int64(v) => ValueRef::Timestamp(Timestamp::from_millis(v)), @@ -247,6 +247,15 @@ impl ScalarVectorBuilder for TimestampVectorBuilder { } } +pub(crate) fn replicate_timestamp(vector: &TimestampVector, offsets: &[usize]) -> VectorRef { + let array = crate::vectors::primitive::replicate_primitive_with_type( + &vector.array, + offsets, + vector.data_type(), + ); + Arc::new(TimestampVector { array }) +} + #[cfg(test)] mod tests { use super::*; @@ -284,4 +293,13 @@ mod tests { vector.iter_data().collect::>() ); } + + #[test] + fn test_timestamp_from_arrow() { + let vector = + TimestampVector::from_slice(&[Timestamp::from_millis(1), Timestamp::from_millis(2)]); + let arrow = vector.as_arrow().slice(0, vector.len()); + let vector2 = TimestampVector::try_from_arrow_array(&arrow).unwrap(); + assert_eq!(vector, vector2); + } }