From e697ba975be2019d8797ac0f0779d859c5e52e11 Mon Sep 17 00:00:00 2001 From: evenyag Date: Mon, 19 Sep 2022 14:05:02 +0800 Subject: [PATCH] feat: Implement dedup and filter for vectors (#245) * feat: Dedup vector * refactor: Re-export Date/DateTime/Timestamp * refactor: Named field for ListValueRef::Ref Use field val instead of tuple for variant ListValueRef::Ref to keep consistence with ListValueRef::Indexed * feat: Implement ScalarVector for ListVector Also implements ScalarVectorBuilder for ListVectorBuilder, Scalar for ListValue and ScalarRef for ListValueRef * test: Add tests for ScalarVector implementation of ListVector * feat: Implement dedup using match_scalar_vector * refactor: Move dedup func to individual mod * chore: Update ListValueRef comments * refactor: Move replicate to VectorOp Move compute operations to VectorOp trait and acts as an super trait of Vector. So we could later put dedup/filter methods to VectorOp trait, avoid to define too many methods in Vector trait. * refactor: Move scalar bounds to PrimitiveElement Move Scalar and ScalarRef trait bounds to PrimitiveElement, so for each native type which implements PrimitiveElement, its PrimitiveVector always implements ScalarVector, so we could use it as ScalarVector without adding additional trait bounds * refactor: Move dedup to VectorOp Remove compute mod and move dedup logic to operations::dedup * feat: Implement VectorOp::filter * test: Move replicate test of primitive to replicate.rs * test: Add more replicate tests * test: Add tests for dedup and filter Also fix NullVector::dedup and ConstantVector::dedup * style: fix clippy * chore: Remove unused scalar.rs * test: Add more tests for VectorOp and fix failed tests Also fix TimestampVector eq not implemented. * chore: Address CR comments * chore: mention vector should be sorted in comment * refactor: slice the vector directly in replicate_primitive_with_type --- src/common/time/src/lib.rs | 3 + src/datatypes/src/error.rs | 6 + src/datatypes/src/scalar.rs | 1 - src/datatypes/src/scalars.rs | 99 ++++++-- src/datatypes/src/scalars/common.rs | 23 -- src/datatypes/src/type_id.rs | 7 +- src/datatypes/src/types/list_type.rs | 2 +- src/datatypes/src/types/primitive_type.rs | 9 +- src/datatypes/src/value.rs | 31 ++- src/datatypes/src/vectors.rs | 18 +- src/datatypes/src/vectors/binary.rs | 12 +- src/datatypes/src/vectors/boolean.rs | 15 +- src/datatypes/src/vectors/constant.rs | 58 +++-- src/datatypes/src/vectors/date.rs | 25 +- src/datatypes/src/vectors/datetime.rs | 25 +- src/datatypes/src/vectors/eq.rs | 38 +-- src/datatypes/src/vectors/list.rs | 207 +++++++++++++--- src/datatypes/src/vectors/null.rs | 21 +- src/datatypes/src/vectors/operations.rs | 121 ++++++++++ src/datatypes/src/vectors/operations/dedup.rs | 223 ++++++++++++++++++ .../src/vectors/operations/filter.rs | 114 +++++++++ .../src/vectors/operations/replicate.rs | 108 +++++++++ src/datatypes/src/vectors/primitive.rs | 135 ++++++----- src/datatypes/src/vectors/string.rs | 12 +- src/datatypes/src/vectors/timestamp.rs | 26 +- 25 files changed, 1114 insertions(+), 225 deletions(-) delete mode 100644 src/datatypes/src/scalar.rs delete mode 100644 src/datatypes/src/scalars/common.rs create mode 100644 src/datatypes/src/vectors/operations.rs create mode 100644 src/datatypes/src/vectors/operations/dedup.rs create mode 100644 src/datatypes/src/vectors/operations/filter.rs create mode 100644 src/datatypes/src/vectors/operations/replicate.rs diff --git a/src/common/time/src/lib.rs b/src/common/time/src/lib.rs index 3b2efc4af2..44c907e7cb 100644 --- a/src/common/time/src/lib.rs +++ b/src/common/time/src/lib.rs @@ -6,5 +6,8 @@ pub mod timestamp; pub mod timestamp_millis; pub mod util; +pub use date::Date; +pub use datetime::DateTime; pub use range::RangeMillis; +pub use timestamp::Timestamp; pub use timestamp_millis::TimestampMillis; diff --git a/src/datatypes/src/error.rs b/src/datatypes/src/error.rs index d67cfcf9a5..efa105a312 100644 --- a/src/datatypes/src/error.rs +++ b/src/datatypes/src/error.rs @@ -50,6 +50,12 @@ pub enum Error { #[snafu(display("{}", msg))] CastType { msg: String, backtrace: Backtrace }, + + #[snafu(display("Arrow failed to compute, source: {}", source))] + ArrowCompute { + source: arrow::error::ArrowError, + backtrace: Backtrace, + }, } impl ErrorExt for Error { diff --git a/src/datatypes/src/scalar.rs b/src/datatypes/src/scalar.rs deleted file mode 100644 index 8b13789179..0000000000 --- a/src/datatypes/src/scalar.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/src/datatypes/src/scalars.rs b/src/datatypes/src/scalars.rs index b9463b8bcd..53b105434b 100644 --- a/src/datatypes/src/scalars.rs +++ b/src/datatypes/src/scalars.rs @@ -1,14 +1,11 @@ use std::any::Any; -use common_time::timestamp::Timestamp; +use common_time::{Date, DateTime, Timestamp}; use crate::prelude::*; -use crate::vectors::date::DateVector; -use crate::vectors::datetime::DateTimeVector; +use crate::value::{ListValue, ListValueRef}; use crate::vectors::*; -pub mod common; - fn get_iter_capacity>(iter: &I) -> usize { match iter.size_hint() { (_lower, Some(upper)) => upper, @@ -244,9 +241,9 @@ impl<'a> ScalarRef<'a> for &'a [u8] { } } -impl Scalar for common_time::date::Date { +impl Scalar for Date { type VectorType = DateVector; - type RefType<'a> = common_time::date::Date; + type RefType<'a> = Date; fn as_scalar_ref(&self) -> Self::RefType<'_> { *self @@ -257,18 +254,18 @@ impl Scalar for common_time::date::Date { } } -impl<'a> ScalarRef<'a> for common_time::date::Date { +impl<'a> ScalarRef<'a> for Date { type VectorType = DateVector; - type ScalarType = common_time::date::Date; + type ScalarType = Date; fn to_owned_scalar(&self) -> Self::ScalarType { *self } } -impl Scalar for common_time::datetime::DateTime { +impl Scalar for DateTime { type VectorType = DateTimeVector; - type RefType<'a> = common_time::datetime::DateTime; + type RefType<'a> = DateTime; fn as_scalar_ref(&self) -> Self::RefType<'_> { *self @@ -279,9 +276,9 @@ impl Scalar for common_time::datetime::DateTime { } } -impl<'a> ScalarRef<'a> for common_time::datetime::DateTime { +impl<'a> ScalarRef<'a> for DateTime { type VectorType = DateTimeVector; - type ScalarType = common_time::datetime::DateTime; + type ScalarType = DateTime; fn to_owned_scalar(&self) -> Self::ScalarType { *self @@ -310,10 +307,41 @@ impl<'a> ScalarRef<'a> for Timestamp { } } +impl Scalar for ListValue { + type VectorType = ListVector; + type RefType<'a> = ListValueRef<'a>; + + fn as_scalar_ref(&self) -> Self::RefType<'_> { + ListValueRef::Ref { val: self } + } + + fn upcast_gat<'short, 'long: 'short>(long: Self::RefType<'long>) -> Self::RefType<'short> { + long + } +} + +impl<'a> ScalarRef<'a> for ListValueRef<'a> { + type VectorType = ListVector; + type ScalarType = ListValue; + + fn to_owned_scalar(&self) -> Self::ScalarType { + match self { + ListValueRef::Indexed { vector, idx } => match vector.get(*idx) { + // Normally should not get `Value::Null` if the `ListValueRef` comes + // from the iterator of the ListVector, but we avoid panic and just + // returns a default list value in such case since `ListValueRef` may + // be constructed manually. + Value::Null => ListValue::default(), + Value::List(v) => v, + _ => unreachable!(), + }, + ListValueRef::Ref { val } => (*val).clone(), + } + } +} + #[cfg(test)] mod tests { - use common_time::date::Date; - use super::*; use crate::vectors::binary::BinaryVector; use crate::vectors::primitive::Int32Vector; @@ -357,7 +385,7 @@ mod tests { } #[test] - pub fn test_build_date_vector() { + fn test_build_date_vector() { let expect: Vec> = vec![ Some(Date::new(0)), Some(Date::new(-1)), @@ -369,14 +397,49 @@ mod tests { } #[test] - pub fn test_date_scalar() { + fn test_date_scalar() { let date = Date::new(1); assert_eq!(date, date.as_scalar_ref()); assert_eq!(date, date.to_owned_scalar()); } #[test] - pub fn test_build_timestamp_vector() { + fn test_datetime_scalar() { + let dt = DateTime::new(123); + assert_eq!(dt, dt.as_scalar_ref()); + assert_eq!(dt, dt.to_owned_scalar()); + } + + #[test] + fn test_list_value_scalar() { + let list_value = ListValue::new( + Some(Box::new(vec![Value::Int32(123)])), + ConcreteDataType::int32_datatype(), + ); + let list_ref = ListValueRef::Ref { val: &list_value }; + assert_eq!(list_ref, list_value.as_scalar_ref()); + assert_eq!(list_value, list_ref.to_owned_scalar()); + + let mut builder = + ListVectorBuilder::with_type_capacity(ConcreteDataType::int32_datatype(), 1); + builder.push(None); + builder.push(Some(list_value.as_scalar_ref())); + let vector = builder.finish(); + + let ref_on_vec = ListValueRef::Indexed { + vector: &vector, + idx: 0, + }; + assert_eq!(ListValue::default(), ref_on_vec.to_owned_scalar()); + let ref_on_vec = ListValueRef::Indexed { + vector: &vector, + idx: 1, + }; + assert_eq!(list_value, ref_on_vec.to_owned_scalar()); + } + + #[test] + fn test_build_timestamp_vector() { let expect: Vec> = vec![Some(10.into()), None, Some(42.into())]; let vector: TimestampVector = build_vector_from_slice(&expect); assert_vector_eq(&expect, &vector); diff --git a/src/datatypes/src/scalars/common.rs b/src/datatypes/src/scalars/common.rs deleted file mode 100644 index 29e6b90517..0000000000 --- a/src/datatypes/src/scalars/common.rs +++ /dev/null @@ -1,23 +0,0 @@ -use crate::prelude::*; - -pub fn replicate_scalar_vector(c: &C, offsets: &[usize]) -> VectorRef { - debug_assert!( - offsets.len() == c.len(), - "Size of offsets must match size of vector" - ); - - if offsets.is_empty() { - return c.slice(0, 0); - } - let mut builder = <::Builder>::with_capacity(c.len()); - - let mut previous_offset = 0; - for (i, offset) in offsets.iter().enumerate() { - let data = c.get_data(i); - for _ in previous_offset..*offset { - builder.push(data); - } - previous_offset = *offset; - } - builder.to_vector() -} diff --git a/src/datatypes/src/type_id.rs b/src/datatypes/src/type_id.rs index 148e3e9995..0f96bbc083 100644 --- a/src/datatypes/src/type_id.rs +++ b/src/datatypes/src/type_id.rs @@ -1,6 +1,3 @@ -#[cfg(any(test, feature = "test"))] -use crate::data_type::ConcreteDataType; - /// Unique identifier for logical data type. #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum LogicalTypeId { @@ -43,7 +40,9 @@ impl LogicalTypeId { /// # Panics /// Panics if data type is not supported. #[cfg(any(test, feature = "test"))] - pub fn data_type(&self) -> ConcreteDataType { + pub fn data_type(&self) -> crate::data_type::ConcreteDataType { + use crate::data_type::ConcreteDataType; + match self { LogicalTypeId::Null => ConcreteDataType::null_datatype(), LogicalTypeId::Boolean => ConcreteDataType::boolean_datatype(), diff --git a/src/datatypes/src/types/list_type.rs b/src/datatypes/src/types/list_type.rs index eccc49c6d5..bb95b3fc10 100644 --- a/src/datatypes/src/types/list_type.rs +++ b/src/datatypes/src/types/list_type.rs @@ -45,7 +45,7 @@ impl DataType for ListType { } fn create_mutable_vector(&self, capacity: usize) -> Box { - Box::new(ListVectorBuilder::with_capacity( + Box::new(ListVectorBuilder::with_type_capacity( *self.inner.clone(), capacity, )) diff --git a/src/datatypes/src/types/primitive_type.rs b/src/datatypes/src/types/primitive_type.rs index afce150fd8..ad2d59773d 100644 --- a/src/datatypes/src/types/primitive_type.rs +++ b/src/datatypes/src/types/primitive_type.rs @@ -10,6 +10,7 @@ use snafu::OptionExt; use crate::data_type::{ConcreteDataType, DataType}; use crate::error::{self, Result}; use crate::scalars::ScalarVectorBuilder; +use crate::scalars::{Scalar, ScalarRef}; use crate::type_id::LogicalTypeId; use crate::types::primitive_traits::Primitive; use crate::value::{Value, ValueRef}; @@ -30,7 +31,13 @@ impl PartialEq> for PrimitiveType Eq for PrimitiveType {} /// A trait that provide helper methods for a primitive type to implementing the [PrimitiveVector]. -pub trait PrimitiveElement: Primitive { +pub trait PrimitiveElement +where + for<'a> Self: Primitive + + Scalar> + + ScalarRef<'a, ScalarType = Self, VectorType = PrimitiveVector> + + Scalar = Self>, +{ /// Construct the data type struct. fn build_data_type() -> ConcreteDataType; diff --git a/src/datatypes/src/value.rs b/src/datatypes/src/value.rs index c2aa38dd56..3c99a88a2b 100644 --- a/src/datatypes/src/value.rs +++ b/src/datatypes/src/value.rs @@ -110,7 +110,7 @@ impl Value { Value::Binary(v) => ValueRef::Binary(v), Value::Date(v) => ValueRef::Date(*v), Value::DateTime(v) => ValueRef::DateTime(*v), - Value::List(v) => ValueRef::List(ListValueRef::Ref(v)), + Value::List(v) => ValueRef::List(ListValueRef::Ref { val: v }), Value::Timestamp(v) => ValueRef::Timestamp(*v), } } @@ -282,6 +282,12 @@ impl ListValue { } } +impl Default for ListValue { + fn default() -> ListValue { + ListValue::new(None, ConcreteDataType::null_datatype()) + } +} + impl PartialOrd for ListValue { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) @@ -464,19 +470,32 @@ impl<'a> From<&'a [u8]> for ValueRef<'a> { } } +impl<'a> From>> for ValueRef<'a> { + fn from(list: Option) -> ValueRef { + match list { + Some(v) => ValueRef::List(v), + None => ValueRef::Null, + } + } +} + /// Reference to a [ListValue]. -// Comparison still requires some allocation (call of `to_value()`) and might be avoidable. +/// +/// Now comparison still requires some allocation (call of `to_value()`) and +/// might be avoidable by downcasting and comparing the underlying array slice +/// if it becomes bottleneck. #[derive(Debug, Clone, Copy)] pub enum ListValueRef<'a> { Indexed { vector: &'a ListVector, idx: usize }, - Ref(&'a ListValue), + Ref { val: &'a ListValue }, } impl<'a> ListValueRef<'a> { + /// Convert self to [Value]. This method would clone the underlying data. fn to_value(self) -> Value { match self { ListValueRef::Indexed { vector, idx } => vector.get(idx), - ListValueRef::Ref(v) => Value::List((*v).clone()), + ListValueRef::Ref { val } => Value::List(val.clone()), } } } @@ -796,7 +815,7 @@ mod tests { datatype: ConcreteDataType::int32_datatype(), }; assert_eq!( - ValueRef::List(ListValueRef::Ref(&list)), + ValueRef::List(ListValueRef::Ref { val: &list }), Value::List(list.clone()).as_value_ref() ); } @@ -831,7 +850,7 @@ mod tests { items: None, datatype: ConcreteDataType::int32_datatype(), }; - check_as_correct!(ListValueRef::Ref(&list), List, as_list); + check_as_correct!(ListValueRef::Ref { val: &list }, List, as_list); let wrong_value = ValueRef::Int32(12345); assert!(wrong_value.as_binary().is_err()); diff --git a/src/datatypes/src/vectors.rs b/src/datatypes/src/vectors.rs index 1d9a45585e..6ba9ac841f 100644 --- a/src/datatypes/src/vectors.rs +++ b/src/datatypes/src/vectors.rs @@ -9,10 +9,21 @@ mod helper; mod list; pub mod mutable; pub mod null; +mod operations; pub mod primitive; mod string; mod timestamp; +pub mod all { + //! All vector types. + pub use crate::vectors::{ + BinaryVector, BooleanVector, ConstantVector, DateTimeVector, DateVector, Float32Vector, + Float64Vector, Int16Vector, Int32Vector, Int64Vector, Int8Vector, ListVector, NullVector, + PrimitiveVector, StringVector, TimestampVector, UInt16Vector, UInt32Vector, UInt64Vector, + UInt8Vector, + }; +} + use std::any::Any; use std::fmt::Debug; use std::sync::Arc; @@ -29,6 +40,7 @@ pub use helper::Helper; pub use list::*; pub use mutable::MutableVector; pub use null::*; +pub use operations::VectorOp; pub use primitive::*; use snafu::ensure; pub use string::*; @@ -59,7 +71,7 @@ impl<'a> Validity<'a> { } /// Vector of data values. -pub trait Vector: Send + Sync + Serializable + Debug { +pub trait Vector: Send + Sync + Serializable + Debug + VectorOp { /// Returns the data type of the vector. /// /// This may require heap allocation. @@ -140,10 +152,6 @@ pub trait Vector: Send + Sync + Serializable + Debug { Ok(self.get(index)) } - // Copies each element according offsets parameter. - // (i-th element should be copied offsets[i] - offsets[i - 1] times.) - fn replicate(&self, offsets: &[usize]) -> VectorRef; - /// Returns the reference of value at `index`. /// /// # Panics diff --git a/src/datatypes/src/vectors/binary.rs b/src/datatypes/src/vectors/binary.rs index cd4a09e405..d4332976e5 100644 --- a/src/datatypes/src/vectors/binary.rs +++ b/src/datatypes/src/vectors/binary.rs @@ -9,7 +9,7 @@ use snafu::{OptionExt, ResultExt}; use crate::arrow_array::{BinaryArray, MutableBinaryArray}; use crate::data_type::ConcreteDataType; use crate::error::{self, Result}; -use crate::scalars::{common, ScalarVector, ScalarVectorBuilder}; +use crate::scalars::{ScalarVector, ScalarVectorBuilder}; use crate::serialize::Serializable; use crate::value::{Value, ValueRef}; use crate::vectors::{self, MutableVector, Validity, Vector, VectorRef}; @@ -20,6 +20,12 @@ pub struct BinaryVector { array: BinaryArray, } +impl BinaryVector { + pub(crate) fn as_arrow(&self) -> &dyn Array { + &self.array + } +} + impl From for BinaryVector { fn from(array: BinaryArray) -> Self { Self { array } @@ -79,10 +85,6 @@ impl Vector for BinaryVector { vectors::impl_get_for_vector!(self.array, index) } - fn replicate(&self, offsets: &[usize]) -> VectorRef { - common::replicate_scalar_vector(self, offsets) - } - fn get_ref(&self, index: usize) -> ValueRef { vectors::impl_get_ref_for_vector!(self.array, index) } diff --git a/src/datatypes/src/vectors/boolean.rs b/src/datatypes/src/vectors/boolean.rs index 3cc682d6dc..d28a825ee9 100644 --- a/src/datatypes/src/vectors/boolean.rs +++ b/src/datatypes/src/vectors/boolean.rs @@ -8,7 +8,6 @@ use snafu::{OptionExt, ResultExt}; use crate::data_type::ConcreteDataType; use crate::error::Result; -use crate::scalars::common::replicate_scalar_vector; use crate::scalars::{ScalarVector, ScalarVectorBuilder}; use crate::serialize::Serializable; use crate::value::{Value, ValueRef}; @@ -20,6 +19,16 @@ pub struct BooleanVector { array: BooleanArray, } +impl BooleanVector { + pub(crate) fn as_arrow(&self) -> &dyn Array { + &self.array + } + + pub(crate) fn as_boolean_array(&self) -> &BooleanArray { + &self.array + } +} + impl From> for BooleanVector { fn from(data: Vec) -> Self { BooleanVector { @@ -95,10 +104,6 @@ impl Vector for BooleanVector { vectors::impl_get_for_vector!(self.array, index) } - fn replicate(&self, offsets: &[usize]) -> VectorRef { - replicate_scalar_vector(self, offsets) - } - fn get_ref(&self, index: usize) -> ValueRef { vectors::impl_get_ref_for_vector!(self.array, index) } diff --git a/src/datatypes/src/vectors/constant.rs b/src/datatypes/src/vectors/constant.rs index fa8cdb02af..dcbd8b87b5 100644 --- a/src/datatypes/src/vectors/constant.rs +++ b/src/datatypes/src/vectors/constant.rs @@ -10,7 +10,7 @@ use crate::error::{Result, SerializeSnafu}; use crate::serialize::Serializable; use crate::value::{Value, ValueRef}; use crate::vectors::Helper; -use crate::vectors::{Validity, Vector, VectorRef}; +use crate::vectors::{BooleanVector, Validity, Vector, VectorRef}; #[derive(Clone)] pub struct ConstantVector { @@ -19,7 +19,13 @@ pub struct ConstantVector { } impl ConstantVector { + /// Create a new [ConstantVector]. + /// + /// # Panics + /// Panics if `vector.len() != 1`. pub fn new(vector: VectorRef, length: usize) -> Self { + assert_eq!(1, vector.len()); + // Avoid const recursion. if vector.is_const() { let vec: &ConstantVector = unsafe { Helper::static_cast(&vector) }; @@ -31,6 +37,11 @@ impl ConstantVector { pub fn inner(&self) -> &VectorRef { &self.vector } + + /// Returns the constant value. + pub fn get_constant_ref(&self) -> ValueRef { + self.vector.get_ref(0) + } } impl Vector for ConstantVector { @@ -95,15 +106,6 @@ impl Vector for ConstantVector { self.vector.get(0) } - fn replicate(&self, offsets: &[usize]) -> VectorRef { - debug_assert!( - offsets.len() == self.len(), - "Size of offsets must match size of column" - ); - - Arc::new(Self::new(self.vector.clone(), *offsets.last().unwrap())) - } - fn get_ref(&self, _index: usize) -> ValueRef { self.vector.get_ref(0) } @@ -111,18 +113,13 @@ impl Vector for ConstantVector { impl fmt::Debug for ConstantVector { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!( - f, - "ConstantVector([{:?}; {}])", - self.try_get(0).unwrap_or(Value::Null), - self.len() - ) + write!(f, "ConstantVector([{:?}; {}])", self.get(0), self.len()) } } impl Serializable for ConstantVector { fn serialize_to_json(&self) -> Result> { - std::iter::repeat(self.try_get(0)?) + std::iter::repeat(self.get(0)) .take(self.len()) .map(serde_json::Value::try_from) .collect::>() @@ -130,6 +127,33 @@ impl Serializable for ConstantVector { } } +pub(crate) fn replicate_constant(vector: &ConstantVector, offsets: &[usize]) -> VectorRef { + assert_eq!(offsets.len(), vector.len()); + + if offsets.is_empty() { + return vector.slice(0, 0); + } + + Arc::new(ConstantVector::new( + vector.vector.clone(), + *offsets.last().unwrap(), + )) +} + +pub(crate) fn filter_constant( + vector: &ConstantVector, + filter: &BooleanVector, +) -> Result { + let length = filter.len() - filter.as_boolean_array().values().null_count(); + if length == vector.len() { + return Ok(Arc::new(vector.clone())); + } + Ok(Arc::new(ConstantVector::new( + vector.inner().clone(), + length, + ))) +} + #[cfg(test)] mod tests { use arrow::datatypes::DataType as ArrowDataType; diff --git a/src/datatypes/src/vectors/date.rs b/src/datatypes/src/vectors/date.rs index c9f4cb1a56..060cb892a5 100644 --- a/src/datatypes/src/vectors/date.rs +++ b/src/datatypes/src/vectors/date.rs @@ -36,6 +36,10 @@ impl DateVector { .clone(), )) } + + pub(crate) fn as_arrow(&self) -> &dyn Array { + self.array.as_arrow() + } } impl Vector for DateVector { @@ -103,10 +107,6 @@ impl Vector for DateVector { } } - fn replicate(&self, offsets: &[usize]) -> VectorRef { - self.array.replicate(offsets) - } - fn get_ref(&self, index: usize) -> ValueRef { match self.array.get(index) { Value::Int32(v) => ValueRef::Date(Date::new(v)), @@ -236,6 +236,15 @@ impl ScalarVectorBuilder for DateVectorBuilder { } } +pub(crate) fn replicate_date(vector: &DateVector, offsets: &[usize]) -> VectorRef { + let array = crate::vectors::primitive::replicate_primitive_with_type( + &vector.array, + offsets, + vector.data_type(), + ); + Arc::new(DateVector { array }) +} + #[cfg(test)] mod tests { use super::*; @@ -293,4 +302,12 @@ mod tests { ])); assert_eq!(expect, vector); } + + #[test] + fn test_date_from_arrow() { + let vector = DateVector::from_slice(&[Date::new(1), Date::new(2)]); + let arrow = vector.as_arrow().slice(0, vector.len()); + let vector2 = DateVector::try_from_arrow_array(&arrow).unwrap(); + assert_eq!(vector, vector2); + } } diff --git a/src/datatypes/src/vectors/datetime.rs b/src/datatypes/src/vectors/datetime.rs index 2b95a67533..9bd36cc165 100644 --- a/src/datatypes/src/vectors/datetime.rs +++ b/src/datatypes/src/vectors/datetime.rs @@ -37,6 +37,10 @@ impl DateTimeVector { .clone(), )) } + + pub(crate) fn as_arrow(&self) -> &dyn Array { + self.array.as_arrow() + } } impl Vector for DateTimeVector { @@ -104,10 +108,6 @@ impl Vector for DateTimeVector { } } - fn replicate(&self, offsets: &[usize]) -> VectorRef { - self.array.replicate(offsets) - } - fn get_ref(&self, index: usize) -> ValueRef { match self.array.get(index) { Value::Int64(v) => ValueRef::DateTime(DateTime::new(v)), @@ -236,6 +236,15 @@ impl ScalarVector for DateTimeVector { } } +pub(crate) fn replicate_datetime(vector: &DateTimeVector, offsets: &[usize]) -> VectorRef { + let array = crate::vectors::primitive::replicate_primitive_with_type( + &vector.array, + offsets, + vector.data_type(), + ); + Arc::new(DateTimeVector { array }) +} + #[cfg(test)] mod tests { use std::assert_matches::assert_matches; @@ -312,4 +321,12 @@ mod tests { ])); assert_eq!(expect, vector); } + + #[test] + fn test_datetime_from_arrow() { + let vector = DateTimeVector::from_slice(&[DateTime::new(1), DateTime::new(2)]); + let arrow = vector.as_arrow().slice(0, vector.len()); + let vector2 = DateTimeVector::try_from_arrow_array(&arrow).unwrap(); + assert_eq!(vector, vector2); + } } diff --git a/src/datatypes/src/vectors/eq.rs b/src/datatypes/src/vectors/eq.rs index 6afb793e0c..ea979b1c53 100644 --- a/src/datatypes/src/vectors/eq.rs +++ b/src/datatypes/src/vectors/eq.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use crate::data_type::DataType; use crate::vectors::{ BinaryVector, BooleanVector, ConstantVector, DateTimeVector, DateVector, ListVector, - PrimitiveVector, StringVector, Vector, + PrimitiveVector, StringVector, TimestampVector, Vector, }; use crate::with_match_primitive_type_id; @@ -54,6 +54,7 @@ fn equal(lhs: &dyn Vector, rhs: &dyn Vector) -> bool { use crate::data_type::ConcreteDataType::*; + let lhs_type = lhs.data_type(); match lhs.data_type() { Null(_) => true, Boolean(_) => is_vector_eq!(BooleanVector, lhs, rhs), @@ -61,27 +62,32 @@ fn equal(lhs: &dyn Vector, rhs: &dyn Vector) -> bool { String(_) => is_vector_eq!(StringVector, lhs, rhs), Date(_) => is_vector_eq!(DateVector, lhs, rhs), DateTime(_) => is_vector_eq!(DateTimeVector, lhs, rhs), + Timestamp(_) => is_vector_eq!(TimestampVector, lhs, rhs), List(_) => is_vector_eq!(ListVector, lhs, rhs), - other => with_match_primitive_type_id!(other.logical_type_id(), |$T| { - let lhs = lhs.as_any().downcast_ref::>().unwrap(); - let rhs = rhs.as_any().downcast_ref::>().unwrap(); + UInt8(_) | UInt16(_) | UInt32(_) | UInt64(_) | Int8(_) | Int16(_) | Int32(_) | Int64(_) + | Float32(_) | Float64(_) => { + with_match_primitive_type_id!(lhs_type.logical_type_id(), |$T| { + let lhs = lhs.as_any().downcast_ref::>().unwrap(); + let rhs = rhs.as_any().downcast_ref::>().unwrap(); - lhs == rhs - }, - { - unreachable!() - }), + lhs == rhs + }, + { + unreachable!("should not compare {} with {}", lhs.vector_type_name(), rhs.vector_type_name()) + }) + } } } #[cfg(test)] mod tests { - use arrow::array::{Int64Array, ListArray, MutableListArray, MutablePrimitiveArray, TryExtend}; + use arrow::array::{ListArray, MutableListArray, MutablePrimitiveArray, TryExtend}; use super::*; use crate::vectors::{ Float32Vector, Float64Vector, Int16Vector, Int32Vector, Int64Vector, Int8Vector, - NullVector, UInt16Vector, UInt32Vector, UInt64Vector, UInt8Vector, VectorRef, + NullVector, TimestampVector, UInt16Vector, UInt32Vector, UInt64Vector, UInt8Vector, + VectorRef, }; fn assert_vector_ref_eq(vector: VectorRef) { @@ -111,10 +117,8 @@ mod tests { ))); assert_vector_ref_eq(Arc::new(BooleanVector::from(vec![true, false]))); assert_vector_ref_eq(Arc::new(DateVector::from(vec![Some(100), Some(120)]))); - assert_vector_ref_eq(Arc::new(DateTimeVector::new(Int64Array::from(vec![ - Some(100), - Some(120), - ])))); + assert_vector_ref_eq(Arc::new(DateTimeVector::from(vec![Some(100), Some(120)]))); + assert_vector_ref_eq(Arc::new(TimestampVector::from_values([100, 120]))); let mut arrow_array = MutableListArray::>::new(); arrow_array @@ -171,7 +175,7 @@ mod tests { 5, )), Arc::new(ConstantVector::new( - Arc::new(BooleanVector::from(vec![true, false])), + Arc::new(BooleanVector::from(vec![false])), 4, )), ); @@ -181,7 +185,7 @@ mod tests { 5, )), Arc::new(ConstantVector::new( - Arc::new(Int32Vector::from_slice(vec![1, 2])), + Arc::new(Int32Vector::from_slice(vec![1])), 4, )), ); diff --git a/src/datatypes/src/vectors/list.rs b/src/datatypes/src/vectors/list.rs index 6269f550a9..d5bb24bcfc 100644 --- a/src/datatypes/src/vectors/list.rs +++ b/src/datatypes/src/vectors/list.rs @@ -1,7 +1,9 @@ use std::any::Any; +use std::ops::Range; use std::sync::Arc; use arrow::array::{Array, ArrayRef, ListArray}; +use arrow::bitmap::utils::ZipValidity; use arrow::bitmap::MutableBitmap; use arrow::datatypes::DataType as ArrowDataType; use serde_json::Value as JsonValue; @@ -24,9 +26,17 @@ pub struct ListVector { } impl ListVector { + /// Only iterate values in the [ListVector]. + /// + /// Be careful to use this method as it would ignore validity and replace null + /// by empty vector. pub fn values_iter(&self) -> Box> + '_> { Box::new(self.array.values_iter().map(VectorHelper::try_into_vector)) } + + pub(crate) fn as_arrow(&self) -> &dyn Array { + &self.array + } } impl Vector for ListVector { @@ -93,13 +103,6 @@ impl Vector for ListVector { )) } - fn replicate(&self, _: &[usize]) -> VectorRef { - // ListVector can be a scalar vector for implementing this `replicate` method. However, - // that requires a lot of efforts, starting from not using Arrow's ListArray. - // Refer to Databend's `ArrayColumn` for more details. - unimplemented!() - } - fn get_ref(&self, index: usize) -> ValueRef { ValueRef::List(ListValueRef::Indexed { vector: self, @@ -137,6 +140,70 @@ impl From for ListVector { impl_try_from_arrow_array_for_vector!(ArrowListArray, ListVector); +pub struct ListVectorIter<'a> { + vector: &'a ListVector, + iter: ZipValidity<'a, usize, Range>, +} + +impl<'a> ListVectorIter<'a> { + pub fn new(vector: &'a ListVector) -> ListVectorIter<'a> { + let iter = ZipValidity::new( + 0..vector.len(), + vector.array.validity().as_ref().map(|x| x.iter()), + ); + + Self { vector, iter } + } +} + +impl<'a> Iterator for ListVectorIter<'a> { + type Item = Option>; + + #[inline] + fn next(&mut self) -> Option { + self.iter.next().map(|idx_opt| { + idx_opt.map(|idx| ListValueRef::Indexed { + vector: self.vector, + idx, + }) + }) + } + + #[inline] + fn size_hint(&self) -> (usize, Option) { + self.iter.size_hint() + } + + #[inline] + fn nth(&mut self, n: usize) -> Option { + self.iter.nth(n).map(|idx_opt| { + idx_opt.map(|idx| ListValueRef::Indexed { + vector: self.vector, + idx, + }) + }) + } +} + +impl ScalarVector for ListVector { + type OwnedItem = ListValue; + type RefItem<'a> = ListValueRef<'a>; + type Iter<'a> = ListVectorIter<'a>; + type Builder = ListVectorBuilder; + + fn get_data(&self, idx: usize) -> Option> { + if self.array.is_valid(idx) { + Some(ListValueRef::Indexed { vector: self, idx }) + } else { + None + } + } + + fn iter_data(&self) -> Self::Iter<'_> { + ListVectorIter::new(self) + } +} + // Some codes are ported from arrow2's MutableListArray. pub struct ListVectorBuilder { inner_type: ConcreteDataType, @@ -146,7 +213,7 @@ pub struct ListVectorBuilder { } impl ListVectorBuilder { - pub fn with_capacity(inner_type: ConcreteDataType, capacity: usize) -> ListVectorBuilder { + pub fn with_type_capacity(inner_type: ConcreteDataType, capacity: usize) -> ListVectorBuilder { let mut offsets = Vec::with_capacity(capacity + 1); offsets.push(0); // The actual required capacity might greater than the capacity of the `ListVector` @@ -224,19 +291,7 @@ impl MutableVector for ListVectorBuilder { } fn to_vector(&mut self) -> VectorRef { - let array = ArrowListArray::try_new( - ConcreteDataType::list_datatype(self.inner_type.clone()).as_arrow_type(), - std::mem::take(&mut self.offsets).into(), - self.values.to_vector().to_arrow_array(), - std::mem::take(&mut self.validity).map(|x| x.into()), - ) - .unwrap(); // The `ListVectorBuilder` itself should ensure it always builds a valid array. - - let vector = ListVector { - array, - inner_datatype: self.inner_type.clone(), - }; - Arc::new(vector) + Arc::new(self.finish()) } fn push_value_ref(&mut self, value: ValueRef) -> Result<()> { @@ -246,7 +301,7 @@ impl MutableVector for ListVectorBuilder { Some(list_value) => self.push_list_value(list_value)?, None => self.push_null(), }, - ListValueRef::Ref(list_value) => self.push_list_value(list_value)?, + ListValueRef::Ref { val } => self.push_list_value(val)?, } } else { self.push_null(); @@ -265,6 +320,41 @@ impl MutableVector for ListVectorBuilder { } } +impl ScalarVectorBuilder for ListVectorBuilder { + type VectorType = ListVector; + + fn with_capacity(_capacity: usize) -> Self { + panic!("Must use ListVectorBuilder::with_type_capacity()"); + } + + fn push(&mut self, value: Option<::RefItem<'_>>) { + // We expect the input ListValue has the same inner type as the builder when using + // push(), so just panic if `push_value_ref()` returns error, which indicate an + // invalid input value type. + self.push_value_ref(value.into()).unwrap_or_else(|e| { + panic!( + "Failed to push value, expect value type {:?}, err:{}", + self.inner_type, e + ); + }); + } + + fn finish(&mut self) -> Self::VectorType { + let array = ArrowListArray::try_new( + ConcreteDataType::list_datatype(self.inner_type.clone()).as_arrow_type(), + std::mem::take(&mut self.offsets).into(), + self.values.to_vector().to_arrow_array(), + std::mem::take(&mut self.validity).map(|x| x.into()), + ) + .unwrap(); // The `ListVectorBuilder` itself should ensure it always builds a valid array. + + ListVector { + array, + inner_datatype: self.inner_type.clone(), + } + } +} + #[cfg(test)] mod tests { use arrow::array::{MutableListArray, MutablePrimitiveArray, TryExtend}; @@ -445,14 +535,16 @@ mod tests { let mut builder = ListType::new(ConcreteDataType::int32_datatype()).create_mutable_vector(3); builder - .push_value_ref(ValueRef::List(ListValueRef::Ref(&ListValue::new( - Some(Box::new(vec![ - Value::Int32(4), - Value::Null, - Value::Int32(6), - ])), - ConcreteDataType::int32_datatype(), - )))) + .push_value_ref(ValueRef::List(ListValueRef::Ref { + val: &ListValue::new( + Some(Box::new(vec![ + Value::Int32(4), + Value::Null, + Value::Int32(6), + ])), + ConcreteDataType::int32_datatype(), + ), + })) .unwrap(); assert!(builder.push_value_ref(ValueRef::Int32(123)).is_err()); @@ -475,4 +567,59 @@ mod tests { ])); assert_eq!(expect, vector); } + + #[test] + fn test_list_vector_for_scalar() { + let mut builder = + ListVectorBuilder::with_type_capacity(ConcreteDataType::int32_datatype(), 2); + builder.push(None); + builder.push(Some(ListValueRef::Ref { + val: &ListValue::new( + Some(Box::new(vec![ + Value::Int32(4), + Value::Null, + Value::Int32(6), + ])), + ConcreteDataType::int32_datatype(), + ), + })); + let vector = builder.finish(); + + let expect = new_list_vector(vec![None, Some(vec![Some(4), None, Some(6)])]); + assert_eq!(expect, vector); + + assert!(vector.get_data(0).is_none()); + assert_eq!( + ListValueRef::Indexed { + vector: &vector, + idx: 1 + }, + vector.get_data(1).unwrap() + ); + assert_eq!( + *vector.get(1).as_list().unwrap().unwrap(), + vector.get_data(1).unwrap().to_owned_scalar() + ); + + let mut iter = vector.iter_data(); + assert!(iter.next().unwrap().is_none()); + assert_eq!( + ListValueRef::Indexed { + vector: &vector, + idx: 1 + }, + iter.next().unwrap().unwrap() + ); + assert!(iter.next().is_none()); + + let mut iter = vector.iter_data(); + assert_eq!(2, iter.size_hint().0); + assert_eq!( + ListValueRef::Indexed { + vector: &vector, + idx: 1 + }, + iter.nth(1).unwrap().unwrap() + ); + } } diff --git a/src/datatypes/src/vectors/null.rs b/src/datatypes/src/vectors/null.rs index 2f483b0ea5..329210886e 100644 --- a/src/datatypes/src/vectors/null.rs +++ b/src/datatypes/src/vectors/null.rs @@ -25,6 +25,10 @@ impl NullVector { array: NullArray::new(ArrowDataType::Null, n), } } + + pub(crate) fn as_arrow(&self) -> &dyn Array { + &self.array + } } impl From for NullVector { @@ -83,17 +87,6 @@ impl Vector for NullVector { Value::Null } - fn replicate(&self, offsets: &[usize]) -> VectorRef { - debug_assert!( - offsets.len() == self.len(), - "Size of offsets must match size of column" - ); - - Arc::new(Self { - array: NullArray::new(ArrowDataType::Null, *offsets.last().unwrap() as usize), - }) - } - fn get_ref(&self, _index: usize) -> ValueRef { // Skips bound check for null array. ValueRef::Null @@ -179,6 +172,12 @@ impl MutableVector for NullVectorBuilder { } } +pub(crate) fn replicate_null(vector: &NullVector, offsets: &[usize]) -> VectorRef { + assert_eq!(offsets.len(), vector.len()); + + Arc::new(NullVector::new(*offsets.last().unwrap())) +} + #[cfg(test)] mod tests { use serde_json; diff --git a/src/datatypes/src/vectors/operations.rs b/src/datatypes/src/vectors/operations.rs new file mode 100644 index 0000000000..ede948ca02 --- /dev/null +++ b/src/datatypes/src/vectors/operations.rs @@ -0,0 +1,121 @@ +mod dedup; +mod filter; +mod replicate; + +use arrow::bitmap::MutableBitmap; + +use crate::error::Result; +use crate::types::PrimitiveElement; +use crate::vectors::all::*; +use crate::vectors::{Vector, VectorRef}; + +/// Vector compute operations. +pub trait VectorOp { + /// Copies each element according `offsets` parameter. + /// (`i-th` element should be copied `offsets[i] - offsets[i - 1]` times.) + /// + /// # Panics + /// Panics if `offsets.len() != self.len()`. + fn replicate(&self, offsets: &[usize]) -> VectorRef; + + /// Dedup elements in `self` and mark `i-th` bit of `selected` to `true` if the `i-th` element + /// of `self` is retained. + /// + /// The caller should ensure + /// 1. the `selected` bitmap is intialized by setting `[0, vector.len())` + /// bits to false. + /// 2. `vector` and `prev_vector` are sorted. + /// + /// If there are multiple duplicate elements, this function retains the **first** element. + /// If the first element of `self` is equal to the last element of `prev_vector`, then that + /// first element is also considered as duplicated and won't be retained. + /// + /// # Panics + /// Panics if + /// - `selected.len() < self.len()`. + /// - `prev_vector` and `self` have different data types. + fn dedup(&self, selected: &mut MutableBitmap, prev_vector: Option<&dyn Vector>); + + /// Filters the vector, returns elements matching the `filter` (i.e. where the values are true). + /// + /// Note that the nulls of `filter` are interpreted as `false` will lead to these elements being masked out. + fn filter(&self, filter: &BooleanVector) -> Result; +} + +macro_rules! impl_scalar_vector_op { + ($( { $VectorType: ident, $replicate: ident } ),+) => {$( + impl VectorOp for $VectorType { + fn replicate(&self, offsets: &[usize]) -> VectorRef { + replicate::$replicate(self, offsets) + } + + fn dedup(&self, selected: &mut MutableBitmap, prev_vector: Option<&dyn Vector>) { + let prev_vector = prev_vector.map(|pv| pv.as_any().downcast_ref::<$VectorType>().unwrap()); + dedup::dedup_scalar(self, selected, prev_vector); + } + + fn filter(&self, filter: &BooleanVector) -> Result { + filter::filter_non_constant!(self, $VectorType, filter) + } + } + )+}; +} + +impl_scalar_vector_op!( + { BinaryVector, replicate_scalar }, + { BooleanVector, replicate_scalar }, + { ListVector, replicate_scalar }, + { StringVector, replicate_scalar }, + { DateVector, replicate_date }, + { DateTimeVector, replicate_datetime }, + { TimestampVector, replicate_timestamp } +); + +impl VectorOp for ConstantVector { + fn replicate(&self, offsets: &[usize]) -> VectorRef { + replicate::replicate_constant(self, offsets) + } + + fn dedup(&self, selected: &mut MutableBitmap, prev_vector: Option<&dyn Vector>) { + let prev_vector = prev_vector.and_then(|pv| pv.as_any().downcast_ref::()); + dedup::dedup_constant(self, selected, prev_vector); + } + + fn filter(&self, filter: &BooleanVector) -> Result { + filter::filter_constant(self, filter) + } +} + +impl VectorOp for NullVector { + fn replicate(&self, offsets: &[usize]) -> VectorRef { + replicate::replicate_null(self, offsets) + } + + fn dedup(&self, selected: &mut MutableBitmap, prev_vector: Option<&dyn Vector>) { + let prev_vector = prev_vector.and_then(|pv| pv.as_any().downcast_ref::()); + dedup::dedup_null(self, selected, prev_vector); + } + + fn filter(&self, filter: &BooleanVector) -> Result { + filter::filter_non_constant!(self, NullVector, filter) + } +} + +impl VectorOp for PrimitiveVector +where + T: PrimitiveElement, +{ + fn replicate(&self, offsets: &[usize]) -> VectorRef { + replicate::replicate_primitive(self, offsets) + } + + fn dedup(&self, selected: &mut MutableBitmap, prev_vector: Option<&dyn Vector>) { + let prev_vector = + prev_vector.and_then(|pv| pv.as_any().downcast_ref::>()); + dedup::dedup_scalar(self, selected, prev_vector); + } + + fn filter(&self, filter: &BooleanVector) -> Result { + filter::filter_non_constant!(self, PrimitiveVector, filter) + } +} diff --git a/src/datatypes/src/vectors/operations/dedup.rs b/src/datatypes/src/vectors/operations/dedup.rs new file mode 100644 index 0000000000..33ea0dfbb7 --- /dev/null +++ b/src/datatypes/src/vectors/operations/dedup.rs @@ -0,0 +1,223 @@ +use arrow::bitmap::MutableBitmap; + +use crate::scalars::ScalarVector; +use crate::vectors::{ConstantVector, NullVector, Vector}; + +pub(crate) fn dedup_scalar<'a, T: ScalarVector>( + vector: &'a T, + selected: &'a mut MutableBitmap, + prev_vector: Option<&'a T>, +) where + T::RefItem<'a>: PartialEq, +{ + assert!(selected.len() >= vector.len()); + + if vector.is_empty() { + return; + } + + for ((i, current), next) in vector + .iter_data() + .enumerate() + .zip(vector.iter_data().skip(1)) + { + if current != next { + // If next element is a different element, we mark it as selected. + selected.set(i + 1, true); + } + } + + // Always retain the first element. + selected.set(0, true); + + // Then check whether still keep the first element based last element in previous vector. + if let Some(pv) = &prev_vector { + if !pv.is_empty() { + let last = pv.get_data(pv.len() - 1); + if last == vector.get_data(0) { + selected.set(0, false); + } + } + } +} + +pub(crate) fn dedup_null( + vector: &NullVector, + selected: &mut MutableBitmap, + prev_vector: Option<&NullVector>, +) { + if vector.is_empty() { + return; + } + + let no_prev_element = prev_vector.map(|v| v.is_empty()).unwrap_or(true); + if no_prev_element { + // Retain first element if no previous element (we known that it must + // be null). + selected.set(0, true); + } +} + +pub(crate) fn dedup_constant( + vector: &ConstantVector, + selected: &mut MutableBitmap, + prev_vector: Option<&ConstantVector>, +) { + if vector.is_empty() { + return; + } + + let equal_to_prev = if let Some(prev) = prev_vector { + !prev.is_empty() && vector.get_constant_ref() == prev.get_constant_ref() + } else { + false + }; + + if !equal_to_prev { + selected.set(0, true); + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use super::*; + use crate::vectors::{Int32Vector, StringVector, VectorOp}; + + fn check_bitmap(expect: &[bool], selected: &MutableBitmap) { + assert_eq!(expect.len(), selected.len()); + for (exp, v) in expect.iter().zip(selected.iter()) { + assert_eq!(*exp, v); + } + } + + fn check_dedup_scalar(expect: &[bool], input: &[i32], prev: Option<&[i32]>) { + check_dedup_scalar_opt(expect, input.iter().map(|v| Some(*v)), prev); + } + + fn check_dedup_scalar_opt( + expect: &[bool], + input: impl Iterator>, + prev: Option<&[i32]>, + ) { + let input = Int32Vector::from_iter(input); + let prev = prev.map(Int32Vector::from_slice); + + let mut selected = MutableBitmap::from_len_zeroed(input.len()); + input.dedup(&mut selected, prev.as_ref().map(|v| v as _)); + + check_bitmap(expect, &selected); + } + + #[test] + fn test_dedup_scalar() { + check_dedup_scalar(&[], &[], None); + check_dedup_scalar(&[true], &[1], None); + check_dedup_scalar(&[true, false], &[1, 1], None); + check_dedup_scalar(&[true, true], &[1, 2], None); + check_dedup_scalar(&[true, true, true, true], &[1, 2, 3, 4], None); + check_dedup_scalar(&[true, false, true, false], &[1, 1, 3, 3], None); + check_dedup_scalar(&[true, false, false, false, true], &[2, 2, 2, 2, 3], None); + + check_dedup_scalar(&[true], &[5], Some(&[])); + check_dedup_scalar(&[true], &[5], Some(&[3])); + check_dedup_scalar(&[false], &[5], Some(&[5])); + check_dedup_scalar(&[false], &[5], Some(&[4, 5])); + check_dedup_scalar(&[false, true], &[5, 6], Some(&[4, 5])); + check_dedup_scalar(&[false, true, false], &[5, 6, 6], Some(&[4, 5])); + check_dedup_scalar( + &[false, true, false, true, true], + &[5, 6, 6, 7, 8], + Some(&[4, 5]), + ); + + check_dedup_scalar_opt( + &[true, true, false, true, false], + [Some(1), Some(2), Some(2), None, None].into_iter(), + None, + ); + } + + fn check_dedup_null(len: usize) { + let input = NullVector::new(len); + let mut selected = MutableBitmap::from_len_zeroed(input.len()); + input.dedup(&mut selected, None); + + let mut expect = vec![false; len]; + if !expect.is_empty() { + expect[0] = true; + } + check_bitmap(&expect, &selected); + + let mut selected = MutableBitmap::from_len_zeroed(input.len()); + let prev = Some(NullVector::new(1)); + input.dedup(&mut selected, prev.as_ref().map(|v| v as _)); + let expect = vec![false; len]; + check_bitmap(&expect, &selected); + } + + #[test] + fn test_dedup_null() { + for len in 0..5 { + check_dedup_null(len); + } + } + + fn check_dedup_constant(len: usize) { + let input = ConstantVector::new(Arc::new(Int32Vector::from_slice(&[8])), len); + let mut selected = MutableBitmap::from_len_zeroed(len); + input.dedup(&mut selected, None); + + let mut expect = vec![false; len]; + if !expect.is_empty() { + expect[0] = true; + } + check_bitmap(&expect, &selected); + + let mut selected = MutableBitmap::from_len_zeroed(len); + let prev = Some(ConstantVector::new( + Arc::new(Int32Vector::from_slice(&[8])), + 1, + )); + input.dedup(&mut selected, prev.as_ref().map(|v| v as _)); + let expect = vec![false; len]; + check_bitmap(&expect, &selected); + } + + #[test] + fn test_dedup_constant() { + for len in 0..5 { + check_dedup_constant(len); + } + } + + #[test] + fn test_dedup_string() { + let input = StringVector::from_slice(&["a", "a", "b", "c"]); + let mut selected = MutableBitmap::from_len_zeroed(4); + input.dedup(&mut selected, None); + let expect = vec![true, false, true, true]; + check_bitmap(&expect, &selected); + } + + macro_rules! impl_dedup_date_like_test { + ($VectorType: ident, $ValueType: ident, $method: ident) => {{ + use common_time::$ValueType; + use $crate::vectors::$VectorType; + + let v = $VectorType::from_iterator([8, 8, 9, 10].into_iter().map($ValueType::$method)); + let mut selected = MutableBitmap::from_len_zeroed(4); + v.dedup(&mut selected, None); + let expect = vec![true, false, true, true]; + check_bitmap(&expect, &selected); + }}; + } + + #[test] + fn test_dedup_date_like() { + impl_dedup_date_like_test!(DateVector, Date, new); + impl_dedup_date_like_test!(DateTimeVector, DateTime, new); + impl_dedup_date_like_test!(TimestampVector, Timestamp, from_millis); + } +} diff --git a/src/datatypes/src/vectors/operations/filter.rs b/src/datatypes/src/vectors/operations/filter.rs new file mode 100644 index 0000000000..4ae0398047 --- /dev/null +++ b/src/datatypes/src/vectors/operations/filter.rs @@ -0,0 +1,114 @@ +pub(crate) use crate::vectors::constant::filter_constant; + +macro_rules! filter_non_constant { + ($vector: expr, $VectorType: ty, $filter: ident) => {{ + use std::sync::Arc; + + use snafu::ResultExt; + + let arrow_array = $vector.as_arrow(); + let filtered = arrow::compute::filter::filter(arrow_array, $filter.as_boolean_array()) + .context(crate::error::ArrowComputeSnafu)?; + Ok(Arc::new(<$VectorType>::try_from_arrow_array(filtered)?)) + }}; +} + +pub(crate) use filter_non_constant; + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use crate::scalars::ScalarVector; + use crate::vectors::{ + BooleanVector, ConstantVector, Int32Vector, NullVector, StringVector, VectorOp, VectorRef, + }; + + fn check_filter_primitive(expect: &[i32], input: &[i32], filter: &[bool]) { + let v = Int32Vector::from_slice(&input); + let filter = BooleanVector::from_slice(filter); + let out = v.filter(&filter).unwrap(); + + let expect: VectorRef = Arc::new(Int32Vector::from_slice(&expect)); + assert_eq!(expect, out); + } + + #[test] + fn test_filter_primitive() { + check_filter_primitive(&[], &[], &[]); + check_filter_primitive(&[5], &[5], &[true]); + check_filter_primitive(&[], &[5], &[false]); + check_filter_primitive(&[], &[5, 6], &[false, false]); + check_filter_primitive(&[5, 6], &[5, 6], &[true, true]); + check_filter_primitive(&[], &[5, 6, 7], &[false, false, false]); + check_filter_primitive(&[5], &[5, 6, 7], &[true, false, false]); + check_filter_primitive(&[6], &[5, 6, 7], &[false, true, false]); + check_filter_primitive(&[7], &[5, 6, 7], &[false, false, true]); + check_filter_primitive(&[5, 7], &[5, 6, 7], &[true, false, true]); + } + + fn check_filter_constant(expect_length: usize, input_length: usize, filter: &[bool]) { + let v = ConstantVector::new(Arc::new(Int32Vector::from_slice(&[123])), input_length); + let filter = BooleanVector::from_slice(filter); + let out = v.filter(&filter).unwrap(); + + assert!(out.is_const()); + assert_eq!(expect_length, out.len()); + } + + #[test] + fn test_filter_constant() { + check_filter_constant(0, 0, &[]); + check_filter_constant(1, 1, &[true]); + check_filter_constant(0, 1, &[false]); + check_filter_constant(1, 2, &[false, true]); + check_filter_constant(2, 2, &[true, true]); + check_filter_constant(1, 4, &[false, false, false, true]); + check_filter_constant(2, 4, &[false, true, false, true]); + } + + #[test] + fn test_filter_scalar() { + let v = StringVector::from_slice(&["0", "1", "2", "3"]); + let filter = BooleanVector::from_slice(&[false, true, false, true]); + let out = v.filter(&filter).unwrap(); + + let expect: VectorRef = Arc::new(StringVector::from_slice(&["1", "3"])); + assert_eq!(expect, out); + } + + #[test] + fn test_filter_null() { + let v = NullVector::new(5); + let filter = BooleanVector::from_slice(&[false, true, false, true, true]); + let out = v.filter(&filter).unwrap(); + + let expect: VectorRef = Arc::new(NullVector::new(3)); + assert_eq!(expect, out); + } + + macro_rules! impl_filter_date_like_test { + ($VectorType: ident, $ValueType: ident, $method: ident) => {{ + use std::sync::Arc; + + use common_time::$ValueType; + use $crate::vectors::{$VectorType, VectorRef}; + + let v = $VectorType::from_iterator((0..5).map($ValueType::$method)); + let filter = BooleanVector::from_slice(&[false, true, false, true, true]); + let out = v.filter(&filter).unwrap(); + + let expect: VectorRef = Arc::new($VectorType::from_iterator( + [1, 3, 4].into_iter().map($ValueType::$method), + )); + assert_eq!(expect, out); + }}; + } + + #[test] + fn test_filter_date_like() { + impl_filter_date_like_test!(DateVector, Date, new); + impl_filter_date_like_test!(DateTimeVector, DateTime, new); + impl_filter_date_like_test!(TimestampVector, Timestamp, from_millis); + } +} diff --git a/src/datatypes/src/vectors/operations/replicate.rs b/src/datatypes/src/vectors/operations/replicate.rs new file mode 100644 index 0000000000..8ed712fd40 --- /dev/null +++ b/src/datatypes/src/vectors/operations/replicate.rs @@ -0,0 +1,108 @@ +use crate::prelude::*; +pub(crate) use crate::vectors::constant::replicate_constant; +pub(crate) use crate::vectors::date::replicate_date; +pub(crate) use crate::vectors::datetime::replicate_datetime; +pub(crate) use crate::vectors::null::replicate_null; +pub(crate) use crate::vectors::primitive::replicate_primitive; +pub(crate) use crate::vectors::timestamp::replicate_timestamp; + +pub(crate) fn replicate_scalar(c: &C, offsets: &[usize]) -> VectorRef { + assert_eq!(offsets.len(), c.len()); + + if offsets.is_empty() { + return c.slice(0, 0); + } + let mut builder = <::Builder>::with_capacity(c.len()); + + let mut previous_offset = 0; + for (i, offset) in offsets.iter().enumerate() { + let data = c.get_data(i); + for _ in previous_offset..*offset { + builder.push(data); + } + previous_offset = *offset; + } + builder.to_vector() +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use super::*; + use crate::vectors::{ConstantVector, Int32Vector, NullVector, StringVector, VectorOp}; + + #[test] + fn test_replicate_primitive() { + let v = Int32Vector::from_iterator(0..5); + let offsets = [0, 1, 2, 3, 4]; + + let v = v.replicate(&offsets); + assert_eq!(4, v.len()); + + for i in 0..4 { + assert_eq!(Value::Int32(i as i32 + 1), v.get(i)); + } + } + + #[test] + fn test_replicate_scalar() { + let v = StringVector::from_slice(&["0", "1", "2", "3"]); + let offsets = [1, 3, 5, 6]; + + let v = v.replicate(&offsets); + assert_eq!(6, v.len()); + + let expect: VectorRef = Arc::new(StringVector::from_slice(&["0", "1", "1", "2", "2", "3"])); + assert_eq!(expect, v); + } + + #[test] + fn test_replicate_constant() { + let v = Arc::new(StringVector::from_slice(&["hello"])); + let cv = ConstantVector::new(v.clone(), 2); + let offsets = [1, 4]; + + let cv = cv.replicate(&offsets); + assert_eq!(4, cv.len()); + + let expect: VectorRef = Arc::new(ConstantVector::new(v, 4)); + assert_eq!(expect, cv); + } + + #[test] + fn test_replicate_null() { + let v = NullVector::new(3); + let offsets = [1, 3, 5]; + + let v = v.replicate(&offsets); + assert_eq!(5, v.len()); + } + + macro_rules! impl_replicate_date_like_test { + ($VectorType: ident, $ValueType: ident, $method: ident) => {{ + use common_time::$ValueType; + use $crate::vectors::$VectorType; + + let v = $VectorType::from_iterator((0..5).map($ValueType::$method)); + let offsets = [0, 1, 2, 3, 4]; + + let v = v.replicate(&offsets); + assert_eq!(4, v.len()); + + for i in 0..4 { + assert_eq!( + Value::$ValueType($ValueType::$method((i as i32 + 1).into())), + v.get(i) + ); + } + }}; + } + + #[test] + fn test_replicate_date_like() { + impl_replicate_date_like_test!(DateVector, Date, new); + impl_replicate_date_like_test!(DateTimeVector, DateTime, new); + impl_replicate_date_like_test!(TimestampVector, Timestamp, from_millis); + } +} diff --git a/src/datatypes/src/vectors/primitive.rs b/src/datatypes/src/vectors/primitive.rs index 642cee4940..a014c3cb22 100644 --- a/src/datatypes/src/vectors/primitive.rs +++ b/src/datatypes/src/vectors/primitive.rs @@ -8,7 +8,7 @@ use arrow::bitmap::utils::ZipValidity; use serde_json::Value as JsonValue; use snafu::{OptionExt, ResultExt}; -use crate::data_type::ConcreteDataType; +use crate::data_type::{ConcreteDataType, DataType}; use crate::error::ConversionSnafu; use crate::error::{Result, SerializeSnafu}; use crate::scalars::{Scalar, ScalarRef}; @@ -59,6 +59,14 @@ impl PrimitiveVector { array: PrimitiveArray::from_values(iter), } } + + pub(crate) fn as_arrow(&self) -> &dyn Array { + &self.array + } + + fn slice(&self, offset: usize, length: usize) -> Self { + Self::from(self.array.slice(offset, length)) + } } impl Vector for PrimitiveVector { @@ -99,40 +107,13 @@ impl Vector for PrimitiveVector { } fn slice(&self, offset: usize, length: usize) -> VectorRef { - Arc::new(Self::from(self.array.slice(offset, length))) + Arc::new(self.slice(offset, length)) } fn get(&self, index: usize) -> Value { vectors::impl_get_for_vector!(self.array, index) } - fn replicate(&self, offsets: &[usize]) -> VectorRef { - debug_assert!( - offsets.len() == self.len(), - "Size of offsets must match size of column" - ); - - if offsets.is_empty() { - return self.slice(0, 0); - } - - let mut builder = - PrimitiveVectorBuilder::::with_capacity(*offsets.last().unwrap() as usize); - - let mut previous_offset = 0; - - for (i, offset) in offsets.iter().enumerate() { - let data = unsafe { self.array.value_unchecked(i) }; - builder.mutable_array.extend( - std::iter::repeat(data) - .take(*offset - previous_offset) - .map(Option::Some), - ); - previous_offset = *offset; - } - builder.to_vector() - } - fn get_ref(&self, index: usize) -> ValueRef { if self.array.is_valid(index) { // Safety: The index have been checked by `is_valid()`. @@ -167,9 +148,7 @@ impl>> FromIterator for Pr impl ScalarVector for PrimitiveVector where - T: Scalar + PrimitiveElement, - for<'a> T: ScalarRef<'a, ScalarType = T, VectorType = Self>, - for<'a> T: Scalar = T>, + T: PrimitiveElement, { type OwnedItem = T; type RefItem<'a> = T; @@ -216,16 +195,18 @@ impl<'a, T: Copy> Iterator for PrimitiveIter<'a, T> { } } -pub struct PrimitiveVectorBuilder { - pub(crate) mutable_array: MutablePrimitiveArray, +impl Serializable for PrimitiveVector { + fn serialize_to_json(&self) -> Result> { + self.array + .iter() + .map(serde_json::to_value) + .collect::>() + .context(SerializeSnafu) + } } -impl PrimitiveVectorBuilder { - fn with_capacity(capacity: usize) -> Self { - Self { - mutable_array: MutablePrimitiveArray::with_capacity(capacity), - } - } +pub struct PrimitiveVectorBuilder { + pub(crate) mutable_array: MutablePrimitiveArray, } pub type UInt8VectorBuilder = PrimitiveVectorBuilder; @@ -259,9 +240,7 @@ impl MutableVector for PrimitiveVectorBuilder { } fn to_vector(&mut self) -> VectorRef { - Arc::new(PrimitiveVector:: { - array: std::mem::take(&mut self.mutable_array).into(), - }) + Arc::new(self.finish()) } fn push_value_ref(&mut self, value: ValueRef) -> Result<()> { @@ -304,16 +283,58 @@ where } } -impl Serializable for PrimitiveVector { - fn serialize_to_json(&self) -> Result> { - self.array - .iter() - .map(serde_json::to_value) - .collect::>() - .context(SerializeSnafu) +impl PrimitiveVectorBuilder { + fn with_type_capacity(data_type: ConcreteDataType, capacity: usize) -> Self { + Self { + mutable_array: MutablePrimitiveArray::with_capacity_from( + capacity, + data_type.as_arrow_type(), + ), + } } } +pub(crate) fn replicate_primitive( + vector: &PrimitiveVector, + offsets: &[usize], +) -> VectorRef { + Arc::new(replicate_primitive_with_type( + vector, + offsets, + T::build_data_type(), + )) +} + +pub(crate) fn replicate_primitive_with_type( + vector: &PrimitiveVector, + offsets: &[usize], + data_type: ConcreteDataType, +) -> PrimitiveVector { + assert_eq!(offsets.len(), vector.len()); + + if offsets.is_empty() { + return vector.slice(0, 0); + } + + let mut builder = PrimitiveVectorBuilder::::with_type_capacity( + data_type, + *offsets.last().unwrap() as usize, + ); + + let mut previous_offset = 0; + + for (i, offset) in offsets.iter().enumerate() { + let data = unsafe { vector.array.value_unchecked(i) }; + builder.mutable_array.extend( + std::iter::repeat(data) + .take(*offset - previous_offset) + .map(Option::Some), + ); + previous_offset = *offset; + } + builder.finish() +} + #[cfg(test)] mod tests { use arrow::datatypes::DataType as ArrowDataType; @@ -425,20 +446,6 @@ mod tests { assert_eq!(Validity::AllValid, vector.validity()); } - #[test] - fn test_replicate() { - let v = PrimitiveVector::::from_slice((0..5).collect::>()); - - let offsets = [0usize, 1usize, 2usize, 3usize, 4usize]; - - let v = v.replicate(&offsets); - assert_eq!(4, v.len()); - - for i in 0..4 { - assert_eq!(Value::Int32(i as i32 + 1), v.get(i)); - } - } - #[test] fn test_memory_size() { let v = PrimitiveVector::::from_slice((0..5).collect::>()); diff --git a/src/datatypes/src/vectors/string.rs b/src/datatypes/src/vectors/string.rs index 02664d1e26..fc75153581 100644 --- a/src/datatypes/src/vectors/string.rs +++ b/src/datatypes/src/vectors/string.rs @@ -9,7 +9,7 @@ use snafu::{OptionExt, ResultExt}; use crate::arrow_array::{MutableStringArray, StringArray}; use crate::data_type::ConcreteDataType; use crate::error::{Result, SerializeSnafu}; -use crate::scalars::{common, ScalarVector, ScalarVectorBuilder}; +use crate::scalars::{ScalarVector, ScalarVectorBuilder}; use crate::serialize::Serializable; use crate::types::StringType; use crate::value::{Value, ValueRef}; @@ -21,6 +21,12 @@ pub struct StringVector { array: StringArray, } +impl StringVector { + pub(crate) fn as_arrow(&self) -> &dyn Array { + &self.array + } +} + impl From for StringVector { fn from(array: StringArray) -> Self { Self { array } @@ -112,10 +118,6 @@ impl Vector for StringVector { vectors::impl_get_for_vector!(self.array, index) } - fn replicate(&self, offsets: &[usize]) -> VectorRef { - common::replicate_scalar_vector(self, offsets) - } - fn get_ref(&self, index: usize) -> ValueRef { vectors::impl_get_ref_for_vector!(self.array, index) } diff --git a/src/datatypes/src/vectors/timestamp.rs b/src/datatypes/src/vectors/timestamp.rs index 77bf8dea35..0b47dc1f5e 100644 --- a/src/datatypes/src/vectors/timestamp.rs +++ b/src/datatypes/src/vectors/timestamp.rs @@ -48,6 +48,10 @@ impl TimestampVector { }, } } + + pub(crate) fn as_arrow(&self) -> &dyn Array { + self.array.as_arrow() + } } impl Vector for TimestampVector { @@ -117,10 +121,6 @@ impl Vector for TimestampVector { } } - fn replicate(&self, offsets: &[usize]) -> VectorRef { - self.array.replicate(offsets) - } - fn get_ref(&self, index: usize) -> ValueRef { match self.array.get(index) { Value::Int64(v) => ValueRef::Timestamp(Timestamp::from_millis(v)), @@ -247,6 +247,15 @@ impl ScalarVectorBuilder for TimestampVectorBuilder { } } +pub(crate) fn replicate_timestamp(vector: &TimestampVector, offsets: &[usize]) -> VectorRef { + let array = crate::vectors::primitive::replicate_primitive_with_type( + &vector.array, + offsets, + vector.data_type(), + ); + Arc::new(TimestampVector { array }) +} + #[cfg(test)] mod tests { use super::*; @@ -284,4 +293,13 @@ mod tests { vector.iter_data().collect::>() ); } + + #[test] + fn test_timestamp_from_arrow() { + let vector = + TimestampVector::from_slice(&[Timestamp::from_millis(1), Timestamp::from_millis(2)]); + let arrow = vector.as_arrow().slice(0, vector.len()); + let vector2 = TimestampVector::try_from_arrow_array(&arrow).unwrap(); + assert_eq!(vector, vector2); + } }