mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-21 15:30:40 +00:00
feat: Implement dedup and filter for vectors (#245)
* feat: Dedup vector * refactor: Re-export Date/DateTime/Timestamp * refactor: Named field for ListValueRef::Ref Use field val instead of tuple for variant ListValueRef::Ref to keep consistence with ListValueRef::Indexed * feat: Implement ScalarVector for ListVector Also implements ScalarVectorBuilder for ListVectorBuilder, Scalar for ListValue and ScalarRef for ListValueRef * test: Add tests for ScalarVector implementation of ListVector * feat: Implement dedup using match_scalar_vector * refactor: Move dedup func to individual mod * chore: Update ListValueRef comments * refactor: Move replicate to VectorOp Move compute operations to VectorOp trait and acts as an super trait of Vector. So we could later put dedup/filter methods to VectorOp trait, avoid to define too many methods in Vector trait. * refactor: Move scalar bounds to PrimitiveElement Move Scalar and ScalarRef trait bounds to PrimitiveElement, so for each native type which implements PrimitiveElement, its PrimitiveVector always implements ScalarVector, so we could use it as ScalarVector without adding additional trait bounds * refactor: Move dedup to VectorOp Remove compute mod and move dedup logic to operations::dedup * feat: Implement VectorOp::filter * test: Move replicate test of primitive to replicate.rs * test: Add more replicate tests * test: Add tests for dedup and filter Also fix NullVector::dedup and ConstantVector::dedup * style: fix clippy * chore: Remove unused scalar.rs * test: Add more tests for VectorOp and fix failed tests Also fix TimestampVector eq not implemented. * chore: Address CR comments * chore: mention vector should be sorted in comment * refactor: slice the vector directly in replicate_primitive_with_type
This commit is contained in:
@@ -6,5 +6,8 @@ pub mod timestamp;
|
||||
pub mod timestamp_millis;
|
||||
pub mod util;
|
||||
|
||||
pub use date::Date;
|
||||
pub use datetime::DateTime;
|
||||
pub use range::RangeMillis;
|
||||
pub use timestamp::Timestamp;
|
||||
pub use timestamp_millis::TimestampMillis;
|
||||
|
||||
@@ -50,6 +50,12 @@ pub enum Error {
|
||||
|
||||
#[snafu(display("{}", msg))]
|
||||
CastType { msg: String, backtrace: Backtrace },
|
||||
|
||||
#[snafu(display("Arrow failed to compute, source: {}", source))]
|
||||
ArrowCompute {
|
||||
source: arrow::error::ArrowError,
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
}
|
||||
|
||||
impl ErrorExt for Error {
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
|
||||
@@ -1,14 +1,11 @@
|
||||
use std::any::Any;
|
||||
|
||||
use common_time::timestamp::Timestamp;
|
||||
use common_time::{Date, DateTime, Timestamp};
|
||||
|
||||
use crate::prelude::*;
|
||||
use crate::vectors::date::DateVector;
|
||||
use crate::vectors::datetime::DateTimeVector;
|
||||
use crate::value::{ListValue, ListValueRef};
|
||||
use crate::vectors::*;
|
||||
|
||||
pub mod common;
|
||||
|
||||
fn get_iter_capacity<T, I: Iterator<Item = T>>(iter: &I) -> usize {
|
||||
match iter.size_hint() {
|
||||
(_lower, Some(upper)) => upper,
|
||||
@@ -244,9 +241,9 @@ impl<'a> ScalarRef<'a> for &'a [u8] {
|
||||
}
|
||||
}
|
||||
|
||||
impl Scalar for common_time::date::Date {
|
||||
impl Scalar for Date {
|
||||
type VectorType = DateVector;
|
||||
type RefType<'a> = common_time::date::Date;
|
||||
type RefType<'a> = Date;
|
||||
|
||||
fn as_scalar_ref(&self) -> Self::RefType<'_> {
|
||||
*self
|
||||
@@ -257,18 +254,18 @@ impl Scalar for common_time::date::Date {
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> ScalarRef<'a> for common_time::date::Date {
|
||||
impl<'a> ScalarRef<'a> for Date {
|
||||
type VectorType = DateVector;
|
||||
type ScalarType = common_time::date::Date;
|
||||
type ScalarType = Date;
|
||||
|
||||
fn to_owned_scalar(&self) -> Self::ScalarType {
|
||||
*self
|
||||
}
|
||||
}
|
||||
|
||||
impl Scalar for common_time::datetime::DateTime {
|
||||
impl Scalar for DateTime {
|
||||
type VectorType = DateTimeVector;
|
||||
type RefType<'a> = common_time::datetime::DateTime;
|
||||
type RefType<'a> = DateTime;
|
||||
|
||||
fn as_scalar_ref(&self) -> Self::RefType<'_> {
|
||||
*self
|
||||
@@ -279,9 +276,9 @@ impl Scalar for common_time::datetime::DateTime {
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> ScalarRef<'a> for common_time::datetime::DateTime {
|
||||
impl<'a> ScalarRef<'a> for DateTime {
|
||||
type VectorType = DateTimeVector;
|
||||
type ScalarType = common_time::datetime::DateTime;
|
||||
type ScalarType = DateTime;
|
||||
|
||||
fn to_owned_scalar(&self) -> Self::ScalarType {
|
||||
*self
|
||||
@@ -310,10 +307,41 @@ impl<'a> ScalarRef<'a> for Timestamp {
|
||||
}
|
||||
}
|
||||
|
||||
impl Scalar for ListValue {
|
||||
type VectorType = ListVector;
|
||||
type RefType<'a> = ListValueRef<'a>;
|
||||
|
||||
fn as_scalar_ref(&self) -> Self::RefType<'_> {
|
||||
ListValueRef::Ref { val: self }
|
||||
}
|
||||
|
||||
fn upcast_gat<'short, 'long: 'short>(long: Self::RefType<'long>) -> Self::RefType<'short> {
|
||||
long
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> ScalarRef<'a> for ListValueRef<'a> {
|
||||
type VectorType = ListVector;
|
||||
type ScalarType = ListValue;
|
||||
|
||||
fn to_owned_scalar(&self) -> Self::ScalarType {
|
||||
match self {
|
||||
ListValueRef::Indexed { vector, idx } => match vector.get(*idx) {
|
||||
// Normally should not get `Value::Null` if the `ListValueRef` comes
|
||||
// from the iterator of the ListVector, but we avoid panic and just
|
||||
// returns a default list value in such case since `ListValueRef` may
|
||||
// be constructed manually.
|
||||
Value::Null => ListValue::default(),
|
||||
Value::List(v) => v,
|
||||
_ => unreachable!(),
|
||||
},
|
||||
ListValueRef::Ref { val } => (*val).clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use common_time::date::Date;
|
||||
|
||||
use super::*;
|
||||
use crate::vectors::binary::BinaryVector;
|
||||
use crate::vectors::primitive::Int32Vector;
|
||||
@@ -357,7 +385,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_build_date_vector() {
|
||||
fn test_build_date_vector() {
|
||||
let expect: Vec<Option<Date>> = vec![
|
||||
Some(Date::new(0)),
|
||||
Some(Date::new(-1)),
|
||||
@@ -369,14 +397,49 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_date_scalar() {
|
||||
fn test_date_scalar() {
|
||||
let date = Date::new(1);
|
||||
assert_eq!(date, date.as_scalar_ref());
|
||||
assert_eq!(date, date.to_owned_scalar());
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_build_timestamp_vector() {
|
||||
fn test_datetime_scalar() {
|
||||
let dt = DateTime::new(123);
|
||||
assert_eq!(dt, dt.as_scalar_ref());
|
||||
assert_eq!(dt, dt.to_owned_scalar());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_list_value_scalar() {
|
||||
let list_value = ListValue::new(
|
||||
Some(Box::new(vec![Value::Int32(123)])),
|
||||
ConcreteDataType::int32_datatype(),
|
||||
);
|
||||
let list_ref = ListValueRef::Ref { val: &list_value };
|
||||
assert_eq!(list_ref, list_value.as_scalar_ref());
|
||||
assert_eq!(list_value, list_ref.to_owned_scalar());
|
||||
|
||||
let mut builder =
|
||||
ListVectorBuilder::with_type_capacity(ConcreteDataType::int32_datatype(), 1);
|
||||
builder.push(None);
|
||||
builder.push(Some(list_value.as_scalar_ref()));
|
||||
let vector = builder.finish();
|
||||
|
||||
let ref_on_vec = ListValueRef::Indexed {
|
||||
vector: &vector,
|
||||
idx: 0,
|
||||
};
|
||||
assert_eq!(ListValue::default(), ref_on_vec.to_owned_scalar());
|
||||
let ref_on_vec = ListValueRef::Indexed {
|
||||
vector: &vector,
|
||||
idx: 1,
|
||||
};
|
||||
assert_eq!(list_value, ref_on_vec.to_owned_scalar());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_build_timestamp_vector() {
|
||||
let expect: Vec<Option<Timestamp>> = vec![Some(10.into()), None, Some(42.into())];
|
||||
let vector: TimestampVector = build_vector_from_slice(&expect);
|
||||
assert_vector_eq(&expect, &vector);
|
||||
|
||||
@@ -1,23 +0,0 @@
|
||||
use crate::prelude::*;
|
||||
|
||||
pub fn replicate_scalar_vector<C: ScalarVector>(c: &C, offsets: &[usize]) -> VectorRef {
|
||||
debug_assert!(
|
||||
offsets.len() == c.len(),
|
||||
"Size of offsets must match size of vector"
|
||||
);
|
||||
|
||||
if offsets.is_empty() {
|
||||
return c.slice(0, 0);
|
||||
}
|
||||
let mut builder = <<C as ScalarVector>::Builder>::with_capacity(c.len());
|
||||
|
||||
let mut previous_offset = 0;
|
||||
for (i, offset) in offsets.iter().enumerate() {
|
||||
let data = c.get_data(i);
|
||||
for _ in previous_offset..*offset {
|
||||
builder.push(data);
|
||||
}
|
||||
previous_offset = *offset;
|
||||
}
|
||||
builder.to_vector()
|
||||
}
|
||||
@@ -1,6 +1,3 @@
|
||||
#[cfg(any(test, feature = "test"))]
|
||||
use crate::data_type::ConcreteDataType;
|
||||
|
||||
/// Unique identifier for logical data type.
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
pub enum LogicalTypeId {
|
||||
@@ -43,7 +40,9 @@ impl LogicalTypeId {
|
||||
/// # Panics
|
||||
/// Panics if data type is not supported.
|
||||
#[cfg(any(test, feature = "test"))]
|
||||
pub fn data_type(&self) -> ConcreteDataType {
|
||||
pub fn data_type(&self) -> crate::data_type::ConcreteDataType {
|
||||
use crate::data_type::ConcreteDataType;
|
||||
|
||||
match self {
|
||||
LogicalTypeId::Null => ConcreteDataType::null_datatype(),
|
||||
LogicalTypeId::Boolean => ConcreteDataType::boolean_datatype(),
|
||||
|
||||
@@ -45,7 +45,7 @@ impl DataType for ListType {
|
||||
}
|
||||
|
||||
fn create_mutable_vector(&self, capacity: usize) -> Box<dyn MutableVector> {
|
||||
Box::new(ListVectorBuilder::with_capacity(
|
||||
Box::new(ListVectorBuilder::with_type_capacity(
|
||||
*self.inner.clone(),
|
||||
capacity,
|
||||
))
|
||||
|
||||
@@ -10,6 +10,7 @@ use snafu::OptionExt;
|
||||
use crate::data_type::{ConcreteDataType, DataType};
|
||||
use crate::error::{self, Result};
|
||||
use crate::scalars::ScalarVectorBuilder;
|
||||
use crate::scalars::{Scalar, ScalarRef};
|
||||
use crate::type_id::LogicalTypeId;
|
||||
use crate::types::primitive_traits::Primitive;
|
||||
use crate::value::{Value, ValueRef};
|
||||
@@ -30,7 +31,13 @@ impl<T: Primitive, U: Primitive> PartialEq<PrimitiveType<U>> for PrimitiveType<T
|
||||
impl<T: Primitive> Eq for PrimitiveType<T> {}
|
||||
|
||||
/// A trait that provide helper methods for a primitive type to implementing the [PrimitiveVector].
|
||||
pub trait PrimitiveElement: Primitive {
|
||||
pub trait PrimitiveElement
|
||||
where
|
||||
for<'a> Self: Primitive
|
||||
+ Scalar<VectorType = PrimitiveVector<Self>>
|
||||
+ ScalarRef<'a, ScalarType = Self, VectorType = PrimitiveVector<Self>>
|
||||
+ Scalar<RefType<'a> = Self>,
|
||||
{
|
||||
/// Construct the data type struct.
|
||||
fn build_data_type() -> ConcreteDataType;
|
||||
|
||||
|
||||
@@ -110,7 +110,7 @@ impl Value {
|
||||
Value::Binary(v) => ValueRef::Binary(v),
|
||||
Value::Date(v) => ValueRef::Date(*v),
|
||||
Value::DateTime(v) => ValueRef::DateTime(*v),
|
||||
Value::List(v) => ValueRef::List(ListValueRef::Ref(v)),
|
||||
Value::List(v) => ValueRef::List(ListValueRef::Ref { val: v }),
|
||||
Value::Timestamp(v) => ValueRef::Timestamp(*v),
|
||||
}
|
||||
}
|
||||
@@ -282,6 +282,12 @@ impl ListValue {
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for ListValue {
|
||||
fn default() -> ListValue {
|
||||
ListValue::new(None, ConcreteDataType::null_datatype())
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialOrd for ListValue {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
Some(self.cmp(other))
|
||||
@@ -464,19 +470,32 @@ impl<'a> From<&'a [u8]> for ValueRef<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> From<Option<ListValueRef<'a>>> for ValueRef<'a> {
|
||||
fn from(list: Option<ListValueRef>) -> ValueRef {
|
||||
match list {
|
||||
Some(v) => ValueRef::List(v),
|
||||
None => ValueRef::Null,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Reference to a [ListValue].
|
||||
// Comparison still requires some allocation (call of `to_value()`) and might be avoidable.
|
||||
///
|
||||
/// Now comparison still requires some allocation (call of `to_value()`) and
|
||||
/// might be avoidable by downcasting and comparing the underlying array slice
|
||||
/// if it becomes bottleneck.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub enum ListValueRef<'a> {
|
||||
Indexed { vector: &'a ListVector, idx: usize },
|
||||
Ref(&'a ListValue),
|
||||
Ref { val: &'a ListValue },
|
||||
}
|
||||
|
||||
impl<'a> ListValueRef<'a> {
|
||||
/// Convert self to [Value]. This method would clone the underlying data.
|
||||
fn to_value(self) -> Value {
|
||||
match self {
|
||||
ListValueRef::Indexed { vector, idx } => vector.get(idx),
|
||||
ListValueRef::Ref(v) => Value::List((*v).clone()),
|
||||
ListValueRef::Ref { val } => Value::List(val.clone()),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -796,7 +815,7 @@ mod tests {
|
||||
datatype: ConcreteDataType::int32_datatype(),
|
||||
};
|
||||
assert_eq!(
|
||||
ValueRef::List(ListValueRef::Ref(&list)),
|
||||
ValueRef::List(ListValueRef::Ref { val: &list }),
|
||||
Value::List(list.clone()).as_value_ref()
|
||||
);
|
||||
}
|
||||
@@ -831,7 +850,7 @@ mod tests {
|
||||
items: None,
|
||||
datatype: ConcreteDataType::int32_datatype(),
|
||||
};
|
||||
check_as_correct!(ListValueRef::Ref(&list), List, as_list);
|
||||
check_as_correct!(ListValueRef::Ref { val: &list }, List, as_list);
|
||||
|
||||
let wrong_value = ValueRef::Int32(12345);
|
||||
assert!(wrong_value.as_binary().is_err());
|
||||
|
||||
@@ -9,10 +9,21 @@ mod helper;
|
||||
mod list;
|
||||
pub mod mutable;
|
||||
pub mod null;
|
||||
mod operations;
|
||||
pub mod primitive;
|
||||
mod string;
|
||||
mod timestamp;
|
||||
|
||||
pub mod all {
|
||||
//! All vector types.
|
||||
pub use crate::vectors::{
|
||||
BinaryVector, BooleanVector, ConstantVector, DateTimeVector, DateVector, Float32Vector,
|
||||
Float64Vector, Int16Vector, Int32Vector, Int64Vector, Int8Vector, ListVector, NullVector,
|
||||
PrimitiveVector, StringVector, TimestampVector, UInt16Vector, UInt32Vector, UInt64Vector,
|
||||
UInt8Vector,
|
||||
};
|
||||
}
|
||||
|
||||
use std::any::Any;
|
||||
use std::fmt::Debug;
|
||||
use std::sync::Arc;
|
||||
@@ -29,6 +40,7 @@ pub use helper::Helper;
|
||||
pub use list::*;
|
||||
pub use mutable::MutableVector;
|
||||
pub use null::*;
|
||||
pub use operations::VectorOp;
|
||||
pub use primitive::*;
|
||||
use snafu::ensure;
|
||||
pub use string::*;
|
||||
@@ -59,7 +71,7 @@ impl<'a> Validity<'a> {
|
||||
}
|
||||
|
||||
/// Vector of data values.
|
||||
pub trait Vector: Send + Sync + Serializable + Debug {
|
||||
pub trait Vector: Send + Sync + Serializable + Debug + VectorOp {
|
||||
/// Returns the data type of the vector.
|
||||
///
|
||||
/// This may require heap allocation.
|
||||
@@ -140,10 +152,6 @@ pub trait Vector: Send + Sync + Serializable + Debug {
|
||||
Ok(self.get(index))
|
||||
}
|
||||
|
||||
// Copies each element according offsets parameter.
|
||||
// (i-th element should be copied offsets[i] - offsets[i - 1] times.)
|
||||
fn replicate(&self, offsets: &[usize]) -> VectorRef;
|
||||
|
||||
/// Returns the reference of value at `index`.
|
||||
///
|
||||
/// # Panics
|
||||
|
||||
@@ -9,7 +9,7 @@ use snafu::{OptionExt, ResultExt};
|
||||
use crate::arrow_array::{BinaryArray, MutableBinaryArray};
|
||||
use crate::data_type::ConcreteDataType;
|
||||
use crate::error::{self, Result};
|
||||
use crate::scalars::{common, ScalarVector, ScalarVectorBuilder};
|
||||
use crate::scalars::{ScalarVector, ScalarVectorBuilder};
|
||||
use crate::serialize::Serializable;
|
||||
use crate::value::{Value, ValueRef};
|
||||
use crate::vectors::{self, MutableVector, Validity, Vector, VectorRef};
|
||||
@@ -20,6 +20,12 @@ pub struct BinaryVector {
|
||||
array: BinaryArray,
|
||||
}
|
||||
|
||||
impl BinaryVector {
|
||||
pub(crate) fn as_arrow(&self) -> &dyn Array {
|
||||
&self.array
|
||||
}
|
||||
}
|
||||
|
||||
impl From<BinaryArray> for BinaryVector {
|
||||
fn from(array: BinaryArray) -> Self {
|
||||
Self { array }
|
||||
@@ -79,10 +85,6 @@ impl Vector for BinaryVector {
|
||||
vectors::impl_get_for_vector!(self.array, index)
|
||||
}
|
||||
|
||||
fn replicate(&self, offsets: &[usize]) -> VectorRef {
|
||||
common::replicate_scalar_vector(self, offsets)
|
||||
}
|
||||
|
||||
fn get_ref(&self, index: usize) -> ValueRef {
|
||||
vectors::impl_get_ref_for_vector!(self.array, index)
|
||||
}
|
||||
|
||||
@@ -8,7 +8,6 @@ use snafu::{OptionExt, ResultExt};
|
||||
|
||||
use crate::data_type::ConcreteDataType;
|
||||
use crate::error::Result;
|
||||
use crate::scalars::common::replicate_scalar_vector;
|
||||
use crate::scalars::{ScalarVector, ScalarVectorBuilder};
|
||||
use crate::serialize::Serializable;
|
||||
use crate::value::{Value, ValueRef};
|
||||
@@ -20,6 +19,16 @@ pub struct BooleanVector {
|
||||
array: BooleanArray,
|
||||
}
|
||||
|
||||
impl BooleanVector {
|
||||
pub(crate) fn as_arrow(&self) -> &dyn Array {
|
||||
&self.array
|
||||
}
|
||||
|
||||
pub(crate) fn as_boolean_array(&self) -> &BooleanArray {
|
||||
&self.array
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Vec<bool>> for BooleanVector {
|
||||
fn from(data: Vec<bool>) -> Self {
|
||||
BooleanVector {
|
||||
@@ -95,10 +104,6 @@ impl Vector for BooleanVector {
|
||||
vectors::impl_get_for_vector!(self.array, index)
|
||||
}
|
||||
|
||||
fn replicate(&self, offsets: &[usize]) -> VectorRef {
|
||||
replicate_scalar_vector(self, offsets)
|
||||
}
|
||||
|
||||
fn get_ref(&self, index: usize) -> ValueRef {
|
||||
vectors::impl_get_ref_for_vector!(self.array, index)
|
||||
}
|
||||
|
||||
@@ -10,7 +10,7 @@ use crate::error::{Result, SerializeSnafu};
|
||||
use crate::serialize::Serializable;
|
||||
use crate::value::{Value, ValueRef};
|
||||
use crate::vectors::Helper;
|
||||
use crate::vectors::{Validity, Vector, VectorRef};
|
||||
use crate::vectors::{BooleanVector, Validity, Vector, VectorRef};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ConstantVector {
|
||||
@@ -19,7 +19,13 @@ pub struct ConstantVector {
|
||||
}
|
||||
|
||||
impl ConstantVector {
|
||||
/// Create a new [ConstantVector].
|
||||
///
|
||||
/// # Panics
|
||||
/// Panics if `vector.len() != 1`.
|
||||
pub fn new(vector: VectorRef, length: usize) -> Self {
|
||||
assert_eq!(1, vector.len());
|
||||
|
||||
// Avoid const recursion.
|
||||
if vector.is_const() {
|
||||
let vec: &ConstantVector = unsafe { Helper::static_cast(&vector) };
|
||||
@@ -31,6 +37,11 @@ impl ConstantVector {
|
||||
pub fn inner(&self) -> &VectorRef {
|
||||
&self.vector
|
||||
}
|
||||
|
||||
/// Returns the constant value.
|
||||
pub fn get_constant_ref(&self) -> ValueRef {
|
||||
self.vector.get_ref(0)
|
||||
}
|
||||
}
|
||||
|
||||
impl Vector for ConstantVector {
|
||||
@@ -95,15 +106,6 @@ impl Vector for ConstantVector {
|
||||
self.vector.get(0)
|
||||
}
|
||||
|
||||
fn replicate(&self, offsets: &[usize]) -> VectorRef {
|
||||
debug_assert!(
|
||||
offsets.len() == self.len(),
|
||||
"Size of offsets must match size of column"
|
||||
);
|
||||
|
||||
Arc::new(Self::new(self.vector.clone(), *offsets.last().unwrap()))
|
||||
}
|
||||
|
||||
fn get_ref(&self, _index: usize) -> ValueRef {
|
||||
self.vector.get_ref(0)
|
||||
}
|
||||
@@ -111,18 +113,13 @@ impl Vector for ConstantVector {
|
||||
|
||||
impl fmt::Debug for ConstantVector {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"ConstantVector([{:?}; {}])",
|
||||
self.try_get(0).unwrap_or(Value::Null),
|
||||
self.len()
|
||||
)
|
||||
write!(f, "ConstantVector([{:?}; {}])", self.get(0), self.len())
|
||||
}
|
||||
}
|
||||
|
||||
impl Serializable for ConstantVector {
|
||||
fn serialize_to_json(&self) -> Result<Vec<serde_json::Value>> {
|
||||
std::iter::repeat(self.try_get(0)?)
|
||||
std::iter::repeat(self.get(0))
|
||||
.take(self.len())
|
||||
.map(serde_json::Value::try_from)
|
||||
.collect::<serde_json::Result<_>>()
|
||||
@@ -130,6 +127,33 @@ impl Serializable for ConstantVector {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn replicate_constant(vector: &ConstantVector, offsets: &[usize]) -> VectorRef {
|
||||
assert_eq!(offsets.len(), vector.len());
|
||||
|
||||
if offsets.is_empty() {
|
||||
return vector.slice(0, 0);
|
||||
}
|
||||
|
||||
Arc::new(ConstantVector::new(
|
||||
vector.vector.clone(),
|
||||
*offsets.last().unwrap(),
|
||||
))
|
||||
}
|
||||
|
||||
pub(crate) fn filter_constant(
|
||||
vector: &ConstantVector,
|
||||
filter: &BooleanVector,
|
||||
) -> Result<VectorRef> {
|
||||
let length = filter.len() - filter.as_boolean_array().values().null_count();
|
||||
if length == vector.len() {
|
||||
return Ok(Arc::new(vector.clone()));
|
||||
}
|
||||
Ok(Arc::new(ConstantVector::new(
|
||||
vector.inner().clone(),
|
||||
length,
|
||||
)))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use arrow::datatypes::DataType as ArrowDataType;
|
||||
|
||||
@@ -36,6 +36,10 @@ impl DateVector {
|
||||
.clone(),
|
||||
))
|
||||
}
|
||||
|
||||
pub(crate) fn as_arrow(&self) -> &dyn Array {
|
||||
self.array.as_arrow()
|
||||
}
|
||||
}
|
||||
|
||||
impl Vector for DateVector {
|
||||
@@ -103,10 +107,6 @@ impl Vector for DateVector {
|
||||
}
|
||||
}
|
||||
|
||||
fn replicate(&self, offsets: &[usize]) -> VectorRef {
|
||||
self.array.replicate(offsets)
|
||||
}
|
||||
|
||||
fn get_ref(&self, index: usize) -> ValueRef {
|
||||
match self.array.get(index) {
|
||||
Value::Int32(v) => ValueRef::Date(Date::new(v)),
|
||||
@@ -236,6 +236,15 @@ impl ScalarVectorBuilder for DateVectorBuilder {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn replicate_date(vector: &DateVector, offsets: &[usize]) -> VectorRef {
|
||||
let array = crate::vectors::primitive::replicate_primitive_with_type(
|
||||
&vector.array,
|
||||
offsets,
|
||||
vector.data_type(),
|
||||
);
|
||||
Arc::new(DateVector { array })
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@@ -293,4 +302,12 @@ mod tests {
|
||||
]));
|
||||
assert_eq!(expect, vector);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_date_from_arrow() {
|
||||
let vector = DateVector::from_slice(&[Date::new(1), Date::new(2)]);
|
||||
let arrow = vector.as_arrow().slice(0, vector.len());
|
||||
let vector2 = DateVector::try_from_arrow_array(&arrow).unwrap();
|
||||
assert_eq!(vector, vector2);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -37,6 +37,10 @@ impl DateTimeVector {
|
||||
.clone(),
|
||||
))
|
||||
}
|
||||
|
||||
pub(crate) fn as_arrow(&self) -> &dyn Array {
|
||||
self.array.as_arrow()
|
||||
}
|
||||
}
|
||||
|
||||
impl Vector for DateTimeVector {
|
||||
@@ -104,10 +108,6 @@ impl Vector for DateTimeVector {
|
||||
}
|
||||
}
|
||||
|
||||
fn replicate(&self, offsets: &[usize]) -> VectorRef {
|
||||
self.array.replicate(offsets)
|
||||
}
|
||||
|
||||
fn get_ref(&self, index: usize) -> ValueRef {
|
||||
match self.array.get(index) {
|
||||
Value::Int64(v) => ValueRef::DateTime(DateTime::new(v)),
|
||||
@@ -236,6 +236,15 @@ impl ScalarVector for DateTimeVector {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn replicate_datetime(vector: &DateTimeVector, offsets: &[usize]) -> VectorRef {
|
||||
let array = crate::vectors::primitive::replicate_primitive_with_type(
|
||||
&vector.array,
|
||||
offsets,
|
||||
vector.data_type(),
|
||||
);
|
||||
Arc::new(DateTimeVector { array })
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::assert_matches::assert_matches;
|
||||
@@ -312,4 +321,12 @@ mod tests {
|
||||
]));
|
||||
assert_eq!(expect, vector);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_datetime_from_arrow() {
|
||||
let vector = DateTimeVector::from_slice(&[DateTime::new(1), DateTime::new(2)]);
|
||||
let arrow = vector.as_arrow().slice(0, vector.len());
|
||||
let vector2 = DateTimeVector::try_from_arrow_array(&arrow).unwrap();
|
||||
assert_eq!(vector, vector2);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@ use std::sync::Arc;
|
||||
use crate::data_type::DataType;
|
||||
use crate::vectors::{
|
||||
BinaryVector, BooleanVector, ConstantVector, DateTimeVector, DateVector, ListVector,
|
||||
PrimitiveVector, StringVector, Vector,
|
||||
PrimitiveVector, StringVector, TimestampVector, Vector,
|
||||
};
|
||||
use crate::with_match_primitive_type_id;
|
||||
|
||||
@@ -54,6 +54,7 @@ fn equal(lhs: &dyn Vector, rhs: &dyn Vector) -> bool {
|
||||
|
||||
use crate::data_type::ConcreteDataType::*;
|
||||
|
||||
let lhs_type = lhs.data_type();
|
||||
match lhs.data_type() {
|
||||
Null(_) => true,
|
||||
Boolean(_) => is_vector_eq!(BooleanVector, lhs, rhs),
|
||||
@@ -61,27 +62,32 @@ fn equal(lhs: &dyn Vector, rhs: &dyn Vector) -> bool {
|
||||
String(_) => is_vector_eq!(StringVector, lhs, rhs),
|
||||
Date(_) => is_vector_eq!(DateVector, lhs, rhs),
|
||||
DateTime(_) => is_vector_eq!(DateTimeVector, lhs, rhs),
|
||||
Timestamp(_) => is_vector_eq!(TimestampVector, lhs, rhs),
|
||||
List(_) => is_vector_eq!(ListVector, lhs, rhs),
|
||||
other => with_match_primitive_type_id!(other.logical_type_id(), |$T| {
|
||||
let lhs = lhs.as_any().downcast_ref::<PrimitiveVector<$T>>().unwrap();
|
||||
let rhs = rhs.as_any().downcast_ref::<PrimitiveVector<$T>>().unwrap();
|
||||
UInt8(_) | UInt16(_) | UInt32(_) | UInt64(_) | Int8(_) | Int16(_) | Int32(_) | Int64(_)
|
||||
| Float32(_) | Float64(_) => {
|
||||
with_match_primitive_type_id!(lhs_type.logical_type_id(), |$T| {
|
||||
let lhs = lhs.as_any().downcast_ref::<PrimitiveVector<$T>>().unwrap();
|
||||
let rhs = rhs.as_any().downcast_ref::<PrimitiveVector<$T>>().unwrap();
|
||||
|
||||
lhs == rhs
|
||||
},
|
||||
{
|
||||
unreachable!()
|
||||
}),
|
||||
lhs == rhs
|
||||
},
|
||||
{
|
||||
unreachable!("should not compare {} with {}", lhs.vector_type_name(), rhs.vector_type_name())
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use arrow::array::{Int64Array, ListArray, MutableListArray, MutablePrimitiveArray, TryExtend};
|
||||
use arrow::array::{ListArray, MutableListArray, MutablePrimitiveArray, TryExtend};
|
||||
|
||||
use super::*;
|
||||
use crate::vectors::{
|
||||
Float32Vector, Float64Vector, Int16Vector, Int32Vector, Int64Vector, Int8Vector,
|
||||
NullVector, UInt16Vector, UInt32Vector, UInt64Vector, UInt8Vector, VectorRef,
|
||||
NullVector, TimestampVector, UInt16Vector, UInt32Vector, UInt64Vector, UInt8Vector,
|
||||
VectorRef,
|
||||
};
|
||||
|
||||
fn assert_vector_ref_eq(vector: VectorRef) {
|
||||
@@ -111,10 +117,8 @@ mod tests {
|
||||
)));
|
||||
assert_vector_ref_eq(Arc::new(BooleanVector::from(vec![true, false])));
|
||||
assert_vector_ref_eq(Arc::new(DateVector::from(vec![Some(100), Some(120)])));
|
||||
assert_vector_ref_eq(Arc::new(DateTimeVector::new(Int64Array::from(vec![
|
||||
Some(100),
|
||||
Some(120),
|
||||
]))));
|
||||
assert_vector_ref_eq(Arc::new(DateTimeVector::from(vec![Some(100), Some(120)])));
|
||||
assert_vector_ref_eq(Arc::new(TimestampVector::from_values([100, 120])));
|
||||
|
||||
let mut arrow_array = MutableListArray::<i32, MutablePrimitiveArray<i64>>::new();
|
||||
arrow_array
|
||||
@@ -171,7 +175,7 @@ mod tests {
|
||||
5,
|
||||
)),
|
||||
Arc::new(ConstantVector::new(
|
||||
Arc::new(BooleanVector::from(vec![true, false])),
|
||||
Arc::new(BooleanVector::from(vec![false])),
|
||||
4,
|
||||
)),
|
||||
);
|
||||
@@ -181,7 +185,7 @@ mod tests {
|
||||
5,
|
||||
)),
|
||||
Arc::new(ConstantVector::new(
|
||||
Arc::new(Int32Vector::from_slice(vec![1, 2])),
|
||||
Arc::new(Int32Vector::from_slice(vec![1])),
|
||||
4,
|
||||
)),
|
||||
);
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
use std::any::Any;
|
||||
use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow::array::{Array, ArrayRef, ListArray};
|
||||
use arrow::bitmap::utils::ZipValidity;
|
||||
use arrow::bitmap::MutableBitmap;
|
||||
use arrow::datatypes::DataType as ArrowDataType;
|
||||
use serde_json::Value as JsonValue;
|
||||
@@ -24,9 +26,17 @@ pub struct ListVector {
|
||||
}
|
||||
|
||||
impl ListVector {
|
||||
/// Only iterate values in the [ListVector].
|
||||
///
|
||||
/// Be careful to use this method as it would ignore validity and replace null
|
||||
/// by empty vector.
|
||||
pub fn values_iter(&self) -> Box<dyn Iterator<Item = Result<VectorRef>> + '_> {
|
||||
Box::new(self.array.values_iter().map(VectorHelper::try_into_vector))
|
||||
}
|
||||
|
||||
pub(crate) fn as_arrow(&self) -> &dyn Array {
|
||||
&self.array
|
||||
}
|
||||
}
|
||||
|
||||
impl Vector for ListVector {
|
||||
@@ -93,13 +103,6 @@ impl Vector for ListVector {
|
||||
))
|
||||
}
|
||||
|
||||
fn replicate(&self, _: &[usize]) -> VectorRef {
|
||||
// ListVector can be a scalar vector for implementing this `replicate` method. However,
|
||||
// that requires a lot of efforts, starting from not using Arrow's ListArray.
|
||||
// Refer to Databend's `ArrayColumn` for more details.
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn get_ref(&self, index: usize) -> ValueRef {
|
||||
ValueRef::List(ListValueRef::Indexed {
|
||||
vector: self,
|
||||
@@ -137,6 +140,70 @@ impl From<ArrowListArray> for ListVector {
|
||||
|
||||
impl_try_from_arrow_array_for_vector!(ArrowListArray, ListVector);
|
||||
|
||||
pub struct ListVectorIter<'a> {
|
||||
vector: &'a ListVector,
|
||||
iter: ZipValidity<'a, usize, Range<usize>>,
|
||||
}
|
||||
|
||||
impl<'a> ListVectorIter<'a> {
|
||||
pub fn new(vector: &'a ListVector) -> ListVectorIter<'a> {
|
||||
let iter = ZipValidity::new(
|
||||
0..vector.len(),
|
||||
vector.array.validity().as_ref().map(|x| x.iter()),
|
||||
);
|
||||
|
||||
Self { vector, iter }
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Iterator for ListVectorIter<'a> {
|
||||
type Item = Option<ListValueRef<'a>>;
|
||||
|
||||
#[inline]
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
self.iter.next().map(|idx_opt| {
|
||||
idx_opt.map(|idx| ListValueRef::Indexed {
|
||||
vector: self.vector,
|
||||
idx,
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn size_hint(&self) -> (usize, Option<usize>) {
|
||||
self.iter.size_hint()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn nth(&mut self, n: usize) -> Option<Self::Item> {
|
||||
self.iter.nth(n).map(|idx_opt| {
|
||||
idx_opt.map(|idx| ListValueRef::Indexed {
|
||||
vector: self.vector,
|
||||
idx,
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl ScalarVector for ListVector {
|
||||
type OwnedItem = ListValue;
|
||||
type RefItem<'a> = ListValueRef<'a>;
|
||||
type Iter<'a> = ListVectorIter<'a>;
|
||||
type Builder = ListVectorBuilder;
|
||||
|
||||
fn get_data(&self, idx: usize) -> Option<Self::RefItem<'_>> {
|
||||
if self.array.is_valid(idx) {
|
||||
Some(ListValueRef::Indexed { vector: self, idx })
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
fn iter_data(&self) -> Self::Iter<'_> {
|
||||
ListVectorIter::new(self)
|
||||
}
|
||||
}
|
||||
|
||||
// Some codes are ported from arrow2's MutableListArray.
|
||||
pub struct ListVectorBuilder {
|
||||
inner_type: ConcreteDataType,
|
||||
@@ -146,7 +213,7 @@ pub struct ListVectorBuilder {
|
||||
}
|
||||
|
||||
impl ListVectorBuilder {
|
||||
pub fn with_capacity(inner_type: ConcreteDataType, capacity: usize) -> ListVectorBuilder {
|
||||
pub fn with_type_capacity(inner_type: ConcreteDataType, capacity: usize) -> ListVectorBuilder {
|
||||
let mut offsets = Vec::with_capacity(capacity + 1);
|
||||
offsets.push(0);
|
||||
// The actual required capacity might greater than the capacity of the `ListVector`
|
||||
@@ -224,19 +291,7 @@ impl MutableVector for ListVectorBuilder {
|
||||
}
|
||||
|
||||
fn to_vector(&mut self) -> VectorRef {
|
||||
let array = ArrowListArray::try_new(
|
||||
ConcreteDataType::list_datatype(self.inner_type.clone()).as_arrow_type(),
|
||||
std::mem::take(&mut self.offsets).into(),
|
||||
self.values.to_vector().to_arrow_array(),
|
||||
std::mem::take(&mut self.validity).map(|x| x.into()),
|
||||
)
|
||||
.unwrap(); // The `ListVectorBuilder` itself should ensure it always builds a valid array.
|
||||
|
||||
let vector = ListVector {
|
||||
array,
|
||||
inner_datatype: self.inner_type.clone(),
|
||||
};
|
||||
Arc::new(vector)
|
||||
Arc::new(self.finish())
|
||||
}
|
||||
|
||||
fn push_value_ref(&mut self, value: ValueRef) -> Result<()> {
|
||||
@@ -246,7 +301,7 @@ impl MutableVector for ListVectorBuilder {
|
||||
Some(list_value) => self.push_list_value(list_value)?,
|
||||
None => self.push_null(),
|
||||
},
|
||||
ListValueRef::Ref(list_value) => self.push_list_value(list_value)?,
|
||||
ListValueRef::Ref { val } => self.push_list_value(val)?,
|
||||
}
|
||||
} else {
|
||||
self.push_null();
|
||||
@@ -265,6 +320,41 @@ impl MutableVector for ListVectorBuilder {
|
||||
}
|
||||
}
|
||||
|
||||
impl ScalarVectorBuilder for ListVectorBuilder {
|
||||
type VectorType = ListVector;
|
||||
|
||||
fn with_capacity(_capacity: usize) -> Self {
|
||||
panic!("Must use ListVectorBuilder::with_type_capacity()");
|
||||
}
|
||||
|
||||
fn push(&mut self, value: Option<<Self::VectorType as ScalarVector>::RefItem<'_>>) {
|
||||
// We expect the input ListValue has the same inner type as the builder when using
|
||||
// push(), so just panic if `push_value_ref()` returns error, which indicate an
|
||||
// invalid input value type.
|
||||
self.push_value_ref(value.into()).unwrap_or_else(|e| {
|
||||
panic!(
|
||||
"Failed to push value, expect value type {:?}, err:{}",
|
||||
self.inner_type, e
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
fn finish(&mut self) -> Self::VectorType {
|
||||
let array = ArrowListArray::try_new(
|
||||
ConcreteDataType::list_datatype(self.inner_type.clone()).as_arrow_type(),
|
||||
std::mem::take(&mut self.offsets).into(),
|
||||
self.values.to_vector().to_arrow_array(),
|
||||
std::mem::take(&mut self.validity).map(|x| x.into()),
|
||||
)
|
||||
.unwrap(); // The `ListVectorBuilder` itself should ensure it always builds a valid array.
|
||||
|
||||
ListVector {
|
||||
array,
|
||||
inner_datatype: self.inner_type.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use arrow::array::{MutableListArray, MutablePrimitiveArray, TryExtend};
|
||||
@@ -445,14 +535,16 @@ mod tests {
|
||||
let mut builder =
|
||||
ListType::new(ConcreteDataType::int32_datatype()).create_mutable_vector(3);
|
||||
builder
|
||||
.push_value_ref(ValueRef::List(ListValueRef::Ref(&ListValue::new(
|
||||
Some(Box::new(vec![
|
||||
Value::Int32(4),
|
||||
Value::Null,
|
||||
Value::Int32(6),
|
||||
])),
|
||||
ConcreteDataType::int32_datatype(),
|
||||
))))
|
||||
.push_value_ref(ValueRef::List(ListValueRef::Ref {
|
||||
val: &ListValue::new(
|
||||
Some(Box::new(vec![
|
||||
Value::Int32(4),
|
||||
Value::Null,
|
||||
Value::Int32(6),
|
||||
])),
|
||||
ConcreteDataType::int32_datatype(),
|
||||
),
|
||||
}))
|
||||
.unwrap();
|
||||
assert!(builder.push_value_ref(ValueRef::Int32(123)).is_err());
|
||||
|
||||
@@ -475,4 +567,59 @@ mod tests {
|
||||
]));
|
||||
assert_eq!(expect, vector);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_list_vector_for_scalar() {
|
||||
let mut builder =
|
||||
ListVectorBuilder::with_type_capacity(ConcreteDataType::int32_datatype(), 2);
|
||||
builder.push(None);
|
||||
builder.push(Some(ListValueRef::Ref {
|
||||
val: &ListValue::new(
|
||||
Some(Box::new(vec![
|
||||
Value::Int32(4),
|
||||
Value::Null,
|
||||
Value::Int32(6),
|
||||
])),
|
||||
ConcreteDataType::int32_datatype(),
|
||||
),
|
||||
}));
|
||||
let vector = builder.finish();
|
||||
|
||||
let expect = new_list_vector(vec![None, Some(vec![Some(4), None, Some(6)])]);
|
||||
assert_eq!(expect, vector);
|
||||
|
||||
assert!(vector.get_data(0).is_none());
|
||||
assert_eq!(
|
||||
ListValueRef::Indexed {
|
||||
vector: &vector,
|
||||
idx: 1
|
||||
},
|
||||
vector.get_data(1).unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
*vector.get(1).as_list().unwrap().unwrap(),
|
||||
vector.get_data(1).unwrap().to_owned_scalar()
|
||||
);
|
||||
|
||||
let mut iter = vector.iter_data();
|
||||
assert!(iter.next().unwrap().is_none());
|
||||
assert_eq!(
|
||||
ListValueRef::Indexed {
|
||||
vector: &vector,
|
||||
idx: 1
|
||||
},
|
||||
iter.next().unwrap().unwrap()
|
||||
);
|
||||
assert!(iter.next().is_none());
|
||||
|
||||
let mut iter = vector.iter_data();
|
||||
assert_eq!(2, iter.size_hint().0);
|
||||
assert_eq!(
|
||||
ListValueRef::Indexed {
|
||||
vector: &vector,
|
||||
idx: 1
|
||||
},
|
||||
iter.nth(1).unwrap().unwrap()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,6 +25,10 @@ impl NullVector {
|
||||
array: NullArray::new(ArrowDataType::Null, n),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn as_arrow(&self) -> &dyn Array {
|
||||
&self.array
|
||||
}
|
||||
}
|
||||
|
||||
impl From<NullArray> for NullVector {
|
||||
@@ -83,17 +87,6 @@ impl Vector for NullVector {
|
||||
Value::Null
|
||||
}
|
||||
|
||||
fn replicate(&self, offsets: &[usize]) -> VectorRef {
|
||||
debug_assert!(
|
||||
offsets.len() == self.len(),
|
||||
"Size of offsets must match size of column"
|
||||
);
|
||||
|
||||
Arc::new(Self {
|
||||
array: NullArray::new(ArrowDataType::Null, *offsets.last().unwrap() as usize),
|
||||
})
|
||||
}
|
||||
|
||||
fn get_ref(&self, _index: usize) -> ValueRef {
|
||||
// Skips bound check for null array.
|
||||
ValueRef::Null
|
||||
@@ -179,6 +172,12 @@ impl MutableVector for NullVectorBuilder {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn replicate_null(vector: &NullVector, offsets: &[usize]) -> VectorRef {
|
||||
assert_eq!(offsets.len(), vector.len());
|
||||
|
||||
Arc::new(NullVector::new(*offsets.last().unwrap()))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use serde_json;
|
||||
|
||||
121
src/datatypes/src/vectors/operations.rs
Normal file
121
src/datatypes/src/vectors/operations.rs
Normal file
@@ -0,0 +1,121 @@
|
||||
mod dedup;
|
||||
mod filter;
|
||||
mod replicate;
|
||||
|
||||
use arrow::bitmap::MutableBitmap;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::types::PrimitiveElement;
|
||||
use crate::vectors::all::*;
|
||||
use crate::vectors::{Vector, VectorRef};
|
||||
|
||||
/// Vector compute operations.
|
||||
pub trait VectorOp {
|
||||
/// Copies each element according `offsets` parameter.
|
||||
/// (`i-th` element should be copied `offsets[i] - offsets[i - 1]` times.)
|
||||
///
|
||||
/// # Panics
|
||||
/// Panics if `offsets.len() != self.len()`.
|
||||
fn replicate(&self, offsets: &[usize]) -> VectorRef;
|
||||
|
||||
/// Dedup elements in `self` and mark `i-th` bit of `selected` to `true` if the `i-th` element
|
||||
/// of `self` is retained.
|
||||
///
|
||||
/// The caller should ensure
|
||||
/// 1. the `selected` bitmap is intialized by setting `[0, vector.len())`
|
||||
/// bits to false.
|
||||
/// 2. `vector` and `prev_vector` are sorted.
|
||||
///
|
||||
/// If there are multiple duplicate elements, this function retains the **first** element.
|
||||
/// If the first element of `self` is equal to the last element of `prev_vector`, then that
|
||||
/// first element is also considered as duplicated and won't be retained.
|
||||
///
|
||||
/// # Panics
|
||||
/// Panics if
|
||||
/// - `selected.len() < self.len()`.
|
||||
/// - `prev_vector` and `self` have different data types.
|
||||
fn dedup(&self, selected: &mut MutableBitmap, prev_vector: Option<&dyn Vector>);
|
||||
|
||||
/// Filters the vector, returns elements matching the `filter` (i.e. where the values are true).
|
||||
///
|
||||
/// Note that the nulls of `filter` are interpreted as `false` will lead to these elements being masked out.
|
||||
fn filter(&self, filter: &BooleanVector) -> Result<VectorRef>;
|
||||
}
|
||||
|
||||
macro_rules! impl_scalar_vector_op {
|
||||
($( { $VectorType: ident, $replicate: ident } ),+) => {$(
|
||||
impl VectorOp for $VectorType {
|
||||
fn replicate(&self, offsets: &[usize]) -> VectorRef {
|
||||
replicate::$replicate(self, offsets)
|
||||
}
|
||||
|
||||
fn dedup(&self, selected: &mut MutableBitmap, prev_vector: Option<&dyn Vector>) {
|
||||
let prev_vector = prev_vector.map(|pv| pv.as_any().downcast_ref::<$VectorType>().unwrap());
|
||||
dedup::dedup_scalar(self, selected, prev_vector);
|
||||
}
|
||||
|
||||
fn filter(&self, filter: &BooleanVector) -> Result<VectorRef> {
|
||||
filter::filter_non_constant!(self, $VectorType, filter)
|
||||
}
|
||||
}
|
||||
)+};
|
||||
}
|
||||
|
||||
impl_scalar_vector_op!(
|
||||
{ BinaryVector, replicate_scalar },
|
||||
{ BooleanVector, replicate_scalar },
|
||||
{ ListVector, replicate_scalar },
|
||||
{ StringVector, replicate_scalar },
|
||||
{ DateVector, replicate_date },
|
||||
{ DateTimeVector, replicate_datetime },
|
||||
{ TimestampVector, replicate_timestamp }
|
||||
);
|
||||
|
||||
impl VectorOp for ConstantVector {
|
||||
fn replicate(&self, offsets: &[usize]) -> VectorRef {
|
||||
replicate::replicate_constant(self, offsets)
|
||||
}
|
||||
|
||||
fn dedup(&self, selected: &mut MutableBitmap, prev_vector: Option<&dyn Vector>) {
|
||||
let prev_vector = prev_vector.and_then(|pv| pv.as_any().downcast_ref::<ConstantVector>());
|
||||
dedup::dedup_constant(self, selected, prev_vector);
|
||||
}
|
||||
|
||||
fn filter(&self, filter: &BooleanVector) -> Result<VectorRef> {
|
||||
filter::filter_constant(self, filter)
|
||||
}
|
||||
}
|
||||
|
||||
impl VectorOp for NullVector {
|
||||
fn replicate(&self, offsets: &[usize]) -> VectorRef {
|
||||
replicate::replicate_null(self, offsets)
|
||||
}
|
||||
|
||||
fn dedup(&self, selected: &mut MutableBitmap, prev_vector: Option<&dyn Vector>) {
|
||||
let prev_vector = prev_vector.and_then(|pv| pv.as_any().downcast_ref::<NullVector>());
|
||||
dedup::dedup_null(self, selected, prev_vector);
|
||||
}
|
||||
|
||||
fn filter(&self, filter: &BooleanVector) -> Result<VectorRef> {
|
||||
filter::filter_non_constant!(self, NullVector, filter)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> VectorOp for PrimitiveVector<T>
|
||||
where
|
||||
T: PrimitiveElement,
|
||||
{
|
||||
fn replicate(&self, offsets: &[usize]) -> VectorRef {
|
||||
replicate::replicate_primitive(self, offsets)
|
||||
}
|
||||
|
||||
fn dedup(&self, selected: &mut MutableBitmap, prev_vector: Option<&dyn Vector>) {
|
||||
let prev_vector =
|
||||
prev_vector.and_then(|pv| pv.as_any().downcast_ref::<PrimitiveVector<T>>());
|
||||
dedup::dedup_scalar(self, selected, prev_vector);
|
||||
}
|
||||
|
||||
fn filter(&self, filter: &BooleanVector) -> Result<VectorRef> {
|
||||
filter::filter_non_constant!(self, PrimitiveVector<T>, filter)
|
||||
}
|
||||
}
|
||||
223
src/datatypes/src/vectors/operations/dedup.rs
Normal file
223
src/datatypes/src/vectors/operations/dedup.rs
Normal file
@@ -0,0 +1,223 @@
|
||||
use arrow::bitmap::MutableBitmap;
|
||||
|
||||
use crate::scalars::ScalarVector;
|
||||
use crate::vectors::{ConstantVector, NullVector, Vector};
|
||||
|
||||
pub(crate) fn dedup_scalar<'a, T: ScalarVector>(
|
||||
vector: &'a T,
|
||||
selected: &'a mut MutableBitmap,
|
||||
prev_vector: Option<&'a T>,
|
||||
) where
|
||||
T::RefItem<'a>: PartialEq,
|
||||
{
|
||||
assert!(selected.len() >= vector.len());
|
||||
|
||||
if vector.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
for ((i, current), next) in vector
|
||||
.iter_data()
|
||||
.enumerate()
|
||||
.zip(vector.iter_data().skip(1))
|
||||
{
|
||||
if current != next {
|
||||
// If next element is a different element, we mark it as selected.
|
||||
selected.set(i + 1, true);
|
||||
}
|
||||
}
|
||||
|
||||
// Always retain the first element.
|
||||
selected.set(0, true);
|
||||
|
||||
// Then check whether still keep the first element based last element in previous vector.
|
||||
if let Some(pv) = &prev_vector {
|
||||
if !pv.is_empty() {
|
||||
let last = pv.get_data(pv.len() - 1);
|
||||
if last == vector.get_data(0) {
|
||||
selected.set(0, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn dedup_null(
|
||||
vector: &NullVector,
|
||||
selected: &mut MutableBitmap,
|
||||
prev_vector: Option<&NullVector>,
|
||||
) {
|
||||
if vector.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
let no_prev_element = prev_vector.map(|v| v.is_empty()).unwrap_or(true);
|
||||
if no_prev_element {
|
||||
// Retain first element if no previous element (we known that it must
|
||||
// be null).
|
||||
selected.set(0, true);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn dedup_constant(
|
||||
vector: &ConstantVector,
|
||||
selected: &mut MutableBitmap,
|
||||
prev_vector: Option<&ConstantVector>,
|
||||
) {
|
||||
if vector.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
let equal_to_prev = if let Some(prev) = prev_vector {
|
||||
!prev.is_empty() && vector.get_constant_ref() == prev.get_constant_ref()
|
||||
} else {
|
||||
false
|
||||
};
|
||||
|
||||
if !equal_to_prev {
|
||||
selected.set(0, true);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::*;
|
||||
use crate::vectors::{Int32Vector, StringVector, VectorOp};
|
||||
|
||||
fn check_bitmap(expect: &[bool], selected: &MutableBitmap) {
|
||||
assert_eq!(expect.len(), selected.len());
|
||||
for (exp, v) in expect.iter().zip(selected.iter()) {
|
||||
assert_eq!(*exp, v);
|
||||
}
|
||||
}
|
||||
|
||||
fn check_dedup_scalar(expect: &[bool], input: &[i32], prev: Option<&[i32]>) {
|
||||
check_dedup_scalar_opt(expect, input.iter().map(|v| Some(*v)), prev);
|
||||
}
|
||||
|
||||
fn check_dedup_scalar_opt(
|
||||
expect: &[bool],
|
||||
input: impl Iterator<Item = Option<i32>>,
|
||||
prev: Option<&[i32]>,
|
||||
) {
|
||||
let input = Int32Vector::from_iter(input);
|
||||
let prev = prev.map(Int32Vector::from_slice);
|
||||
|
||||
let mut selected = MutableBitmap::from_len_zeroed(input.len());
|
||||
input.dedup(&mut selected, prev.as_ref().map(|v| v as _));
|
||||
|
||||
check_bitmap(expect, &selected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_dedup_scalar() {
|
||||
check_dedup_scalar(&[], &[], None);
|
||||
check_dedup_scalar(&[true], &[1], None);
|
||||
check_dedup_scalar(&[true, false], &[1, 1], None);
|
||||
check_dedup_scalar(&[true, true], &[1, 2], None);
|
||||
check_dedup_scalar(&[true, true, true, true], &[1, 2, 3, 4], None);
|
||||
check_dedup_scalar(&[true, false, true, false], &[1, 1, 3, 3], None);
|
||||
check_dedup_scalar(&[true, false, false, false, true], &[2, 2, 2, 2, 3], None);
|
||||
|
||||
check_dedup_scalar(&[true], &[5], Some(&[]));
|
||||
check_dedup_scalar(&[true], &[5], Some(&[3]));
|
||||
check_dedup_scalar(&[false], &[5], Some(&[5]));
|
||||
check_dedup_scalar(&[false], &[5], Some(&[4, 5]));
|
||||
check_dedup_scalar(&[false, true], &[5, 6], Some(&[4, 5]));
|
||||
check_dedup_scalar(&[false, true, false], &[5, 6, 6], Some(&[4, 5]));
|
||||
check_dedup_scalar(
|
||||
&[false, true, false, true, true],
|
||||
&[5, 6, 6, 7, 8],
|
||||
Some(&[4, 5]),
|
||||
);
|
||||
|
||||
check_dedup_scalar_opt(
|
||||
&[true, true, false, true, false],
|
||||
[Some(1), Some(2), Some(2), None, None].into_iter(),
|
||||
None,
|
||||
);
|
||||
}
|
||||
|
||||
fn check_dedup_null(len: usize) {
|
||||
let input = NullVector::new(len);
|
||||
let mut selected = MutableBitmap::from_len_zeroed(input.len());
|
||||
input.dedup(&mut selected, None);
|
||||
|
||||
let mut expect = vec![false; len];
|
||||
if !expect.is_empty() {
|
||||
expect[0] = true;
|
||||
}
|
||||
check_bitmap(&expect, &selected);
|
||||
|
||||
let mut selected = MutableBitmap::from_len_zeroed(input.len());
|
||||
let prev = Some(NullVector::new(1));
|
||||
input.dedup(&mut selected, prev.as_ref().map(|v| v as _));
|
||||
let expect = vec![false; len];
|
||||
check_bitmap(&expect, &selected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_dedup_null() {
|
||||
for len in 0..5 {
|
||||
check_dedup_null(len);
|
||||
}
|
||||
}
|
||||
|
||||
fn check_dedup_constant(len: usize) {
|
||||
let input = ConstantVector::new(Arc::new(Int32Vector::from_slice(&[8])), len);
|
||||
let mut selected = MutableBitmap::from_len_zeroed(len);
|
||||
input.dedup(&mut selected, None);
|
||||
|
||||
let mut expect = vec![false; len];
|
||||
if !expect.is_empty() {
|
||||
expect[0] = true;
|
||||
}
|
||||
check_bitmap(&expect, &selected);
|
||||
|
||||
let mut selected = MutableBitmap::from_len_zeroed(len);
|
||||
let prev = Some(ConstantVector::new(
|
||||
Arc::new(Int32Vector::from_slice(&[8])),
|
||||
1,
|
||||
));
|
||||
input.dedup(&mut selected, prev.as_ref().map(|v| v as _));
|
||||
let expect = vec![false; len];
|
||||
check_bitmap(&expect, &selected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_dedup_constant() {
|
||||
for len in 0..5 {
|
||||
check_dedup_constant(len);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_dedup_string() {
|
||||
let input = StringVector::from_slice(&["a", "a", "b", "c"]);
|
||||
let mut selected = MutableBitmap::from_len_zeroed(4);
|
||||
input.dedup(&mut selected, None);
|
||||
let expect = vec![true, false, true, true];
|
||||
check_bitmap(&expect, &selected);
|
||||
}
|
||||
|
||||
macro_rules! impl_dedup_date_like_test {
|
||||
($VectorType: ident, $ValueType: ident, $method: ident) => {{
|
||||
use common_time::$ValueType;
|
||||
use $crate::vectors::$VectorType;
|
||||
|
||||
let v = $VectorType::from_iterator([8, 8, 9, 10].into_iter().map($ValueType::$method));
|
||||
let mut selected = MutableBitmap::from_len_zeroed(4);
|
||||
v.dedup(&mut selected, None);
|
||||
let expect = vec![true, false, true, true];
|
||||
check_bitmap(&expect, &selected);
|
||||
}};
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_dedup_date_like() {
|
||||
impl_dedup_date_like_test!(DateVector, Date, new);
|
||||
impl_dedup_date_like_test!(DateTimeVector, DateTime, new);
|
||||
impl_dedup_date_like_test!(TimestampVector, Timestamp, from_millis);
|
||||
}
|
||||
}
|
||||
114
src/datatypes/src/vectors/operations/filter.rs
Normal file
114
src/datatypes/src/vectors/operations/filter.rs
Normal file
@@ -0,0 +1,114 @@
|
||||
pub(crate) use crate::vectors::constant::filter_constant;
|
||||
|
||||
macro_rules! filter_non_constant {
|
||||
($vector: expr, $VectorType: ty, $filter: ident) => {{
|
||||
use std::sync::Arc;
|
||||
|
||||
use snafu::ResultExt;
|
||||
|
||||
let arrow_array = $vector.as_arrow();
|
||||
let filtered = arrow::compute::filter::filter(arrow_array, $filter.as_boolean_array())
|
||||
.context(crate::error::ArrowComputeSnafu)?;
|
||||
Ok(Arc::new(<$VectorType>::try_from_arrow_array(filtered)?))
|
||||
}};
|
||||
}
|
||||
|
||||
pub(crate) use filter_non_constant;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::scalars::ScalarVector;
|
||||
use crate::vectors::{
|
||||
BooleanVector, ConstantVector, Int32Vector, NullVector, StringVector, VectorOp, VectorRef,
|
||||
};
|
||||
|
||||
fn check_filter_primitive(expect: &[i32], input: &[i32], filter: &[bool]) {
|
||||
let v = Int32Vector::from_slice(&input);
|
||||
let filter = BooleanVector::from_slice(filter);
|
||||
let out = v.filter(&filter).unwrap();
|
||||
|
||||
let expect: VectorRef = Arc::new(Int32Vector::from_slice(&expect));
|
||||
assert_eq!(expect, out);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_filter_primitive() {
|
||||
check_filter_primitive(&[], &[], &[]);
|
||||
check_filter_primitive(&[5], &[5], &[true]);
|
||||
check_filter_primitive(&[], &[5], &[false]);
|
||||
check_filter_primitive(&[], &[5, 6], &[false, false]);
|
||||
check_filter_primitive(&[5, 6], &[5, 6], &[true, true]);
|
||||
check_filter_primitive(&[], &[5, 6, 7], &[false, false, false]);
|
||||
check_filter_primitive(&[5], &[5, 6, 7], &[true, false, false]);
|
||||
check_filter_primitive(&[6], &[5, 6, 7], &[false, true, false]);
|
||||
check_filter_primitive(&[7], &[5, 6, 7], &[false, false, true]);
|
||||
check_filter_primitive(&[5, 7], &[5, 6, 7], &[true, false, true]);
|
||||
}
|
||||
|
||||
fn check_filter_constant(expect_length: usize, input_length: usize, filter: &[bool]) {
|
||||
let v = ConstantVector::new(Arc::new(Int32Vector::from_slice(&[123])), input_length);
|
||||
let filter = BooleanVector::from_slice(filter);
|
||||
let out = v.filter(&filter).unwrap();
|
||||
|
||||
assert!(out.is_const());
|
||||
assert_eq!(expect_length, out.len());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_filter_constant() {
|
||||
check_filter_constant(0, 0, &[]);
|
||||
check_filter_constant(1, 1, &[true]);
|
||||
check_filter_constant(0, 1, &[false]);
|
||||
check_filter_constant(1, 2, &[false, true]);
|
||||
check_filter_constant(2, 2, &[true, true]);
|
||||
check_filter_constant(1, 4, &[false, false, false, true]);
|
||||
check_filter_constant(2, 4, &[false, true, false, true]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_filter_scalar() {
|
||||
let v = StringVector::from_slice(&["0", "1", "2", "3"]);
|
||||
let filter = BooleanVector::from_slice(&[false, true, false, true]);
|
||||
let out = v.filter(&filter).unwrap();
|
||||
|
||||
let expect: VectorRef = Arc::new(StringVector::from_slice(&["1", "3"]));
|
||||
assert_eq!(expect, out);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_filter_null() {
|
||||
let v = NullVector::new(5);
|
||||
let filter = BooleanVector::from_slice(&[false, true, false, true, true]);
|
||||
let out = v.filter(&filter).unwrap();
|
||||
|
||||
let expect: VectorRef = Arc::new(NullVector::new(3));
|
||||
assert_eq!(expect, out);
|
||||
}
|
||||
|
||||
macro_rules! impl_filter_date_like_test {
|
||||
($VectorType: ident, $ValueType: ident, $method: ident) => {{
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_time::$ValueType;
|
||||
use $crate::vectors::{$VectorType, VectorRef};
|
||||
|
||||
let v = $VectorType::from_iterator((0..5).map($ValueType::$method));
|
||||
let filter = BooleanVector::from_slice(&[false, true, false, true, true]);
|
||||
let out = v.filter(&filter).unwrap();
|
||||
|
||||
let expect: VectorRef = Arc::new($VectorType::from_iterator(
|
||||
[1, 3, 4].into_iter().map($ValueType::$method),
|
||||
));
|
||||
assert_eq!(expect, out);
|
||||
}};
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_filter_date_like() {
|
||||
impl_filter_date_like_test!(DateVector, Date, new);
|
||||
impl_filter_date_like_test!(DateTimeVector, DateTime, new);
|
||||
impl_filter_date_like_test!(TimestampVector, Timestamp, from_millis);
|
||||
}
|
||||
}
|
||||
108
src/datatypes/src/vectors/operations/replicate.rs
Normal file
108
src/datatypes/src/vectors/operations/replicate.rs
Normal file
@@ -0,0 +1,108 @@
|
||||
use crate::prelude::*;
|
||||
pub(crate) use crate::vectors::constant::replicate_constant;
|
||||
pub(crate) use crate::vectors::date::replicate_date;
|
||||
pub(crate) use crate::vectors::datetime::replicate_datetime;
|
||||
pub(crate) use crate::vectors::null::replicate_null;
|
||||
pub(crate) use crate::vectors::primitive::replicate_primitive;
|
||||
pub(crate) use crate::vectors::timestamp::replicate_timestamp;
|
||||
|
||||
pub(crate) fn replicate_scalar<C: ScalarVector>(c: &C, offsets: &[usize]) -> VectorRef {
|
||||
assert_eq!(offsets.len(), c.len());
|
||||
|
||||
if offsets.is_empty() {
|
||||
return c.slice(0, 0);
|
||||
}
|
||||
let mut builder = <<C as ScalarVector>::Builder>::with_capacity(c.len());
|
||||
|
||||
let mut previous_offset = 0;
|
||||
for (i, offset) in offsets.iter().enumerate() {
|
||||
let data = c.get_data(i);
|
||||
for _ in previous_offset..*offset {
|
||||
builder.push(data);
|
||||
}
|
||||
previous_offset = *offset;
|
||||
}
|
||||
builder.to_vector()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::*;
|
||||
use crate::vectors::{ConstantVector, Int32Vector, NullVector, StringVector, VectorOp};
|
||||
|
||||
#[test]
|
||||
fn test_replicate_primitive() {
|
||||
let v = Int32Vector::from_iterator(0..5);
|
||||
let offsets = [0, 1, 2, 3, 4];
|
||||
|
||||
let v = v.replicate(&offsets);
|
||||
assert_eq!(4, v.len());
|
||||
|
||||
for i in 0..4 {
|
||||
assert_eq!(Value::Int32(i as i32 + 1), v.get(i));
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_replicate_scalar() {
|
||||
let v = StringVector::from_slice(&["0", "1", "2", "3"]);
|
||||
let offsets = [1, 3, 5, 6];
|
||||
|
||||
let v = v.replicate(&offsets);
|
||||
assert_eq!(6, v.len());
|
||||
|
||||
let expect: VectorRef = Arc::new(StringVector::from_slice(&["0", "1", "1", "2", "2", "3"]));
|
||||
assert_eq!(expect, v);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_replicate_constant() {
|
||||
let v = Arc::new(StringVector::from_slice(&["hello"]));
|
||||
let cv = ConstantVector::new(v.clone(), 2);
|
||||
let offsets = [1, 4];
|
||||
|
||||
let cv = cv.replicate(&offsets);
|
||||
assert_eq!(4, cv.len());
|
||||
|
||||
let expect: VectorRef = Arc::new(ConstantVector::new(v, 4));
|
||||
assert_eq!(expect, cv);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_replicate_null() {
|
||||
let v = NullVector::new(3);
|
||||
let offsets = [1, 3, 5];
|
||||
|
||||
let v = v.replicate(&offsets);
|
||||
assert_eq!(5, v.len());
|
||||
}
|
||||
|
||||
macro_rules! impl_replicate_date_like_test {
|
||||
($VectorType: ident, $ValueType: ident, $method: ident) => {{
|
||||
use common_time::$ValueType;
|
||||
use $crate::vectors::$VectorType;
|
||||
|
||||
let v = $VectorType::from_iterator((0..5).map($ValueType::$method));
|
||||
let offsets = [0, 1, 2, 3, 4];
|
||||
|
||||
let v = v.replicate(&offsets);
|
||||
assert_eq!(4, v.len());
|
||||
|
||||
for i in 0..4 {
|
||||
assert_eq!(
|
||||
Value::$ValueType($ValueType::$method((i as i32 + 1).into())),
|
||||
v.get(i)
|
||||
);
|
||||
}
|
||||
}};
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_replicate_date_like() {
|
||||
impl_replicate_date_like_test!(DateVector, Date, new);
|
||||
impl_replicate_date_like_test!(DateTimeVector, DateTime, new);
|
||||
impl_replicate_date_like_test!(TimestampVector, Timestamp, from_millis);
|
||||
}
|
||||
}
|
||||
@@ -8,7 +8,7 @@ use arrow::bitmap::utils::ZipValidity;
|
||||
use serde_json::Value as JsonValue;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
|
||||
use crate::data_type::ConcreteDataType;
|
||||
use crate::data_type::{ConcreteDataType, DataType};
|
||||
use crate::error::ConversionSnafu;
|
||||
use crate::error::{Result, SerializeSnafu};
|
||||
use crate::scalars::{Scalar, ScalarRef};
|
||||
@@ -59,6 +59,14 @@ impl<T: Primitive> PrimitiveVector<T> {
|
||||
array: PrimitiveArray::from_values(iter),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn as_arrow(&self) -> &dyn Array {
|
||||
&self.array
|
||||
}
|
||||
|
||||
fn slice(&self, offset: usize, length: usize) -> Self {
|
||||
Self::from(self.array.slice(offset, length))
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: PrimitiveElement> Vector for PrimitiveVector<T> {
|
||||
@@ -99,40 +107,13 @@ impl<T: PrimitiveElement> Vector for PrimitiveVector<T> {
|
||||
}
|
||||
|
||||
fn slice(&self, offset: usize, length: usize) -> VectorRef {
|
||||
Arc::new(Self::from(self.array.slice(offset, length)))
|
||||
Arc::new(self.slice(offset, length))
|
||||
}
|
||||
|
||||
fn get(&self, index: usize) -> Value {
|
||||
vectors::impl_get_for_vector!(self.array, index)
|
||||
}
|
||||
|
||||
fn replicate(&self, offsets: &[usize]) -> VectorRef {
|
||||
debug_assert!(
|
||||
offsets.len() == self.len(),
|
||||
"Size of offsets must match size of column"
|
||||
);
|
||||
|
||||
if offsets.is_empty() {
|
||||
return self.slice(0, 0);
|
||||
}
|
||||
|
||||
let mut builder =
|
||||
PrimitiveVectorBuilder::<T>::with_capacity(*offsets.last().unwrap() as usize);
|
||||
|
||||
let mut previous_offset = 0;
|
||||
|
||||
for (i, offset) in offsets.iter().enumerate() {
|
||||
let data = unsafe { self.array.value_unchecked(i) };
|
||||
builder.mutable_array.extend(
|
||||
std::iter::repeat(data)
|
||||
.take(*offset - previous_offset)
|
||||
.map(Option::Some),
|
||||
);
|
||||
previous_offset = *offset;
|
||||
}
|
||||
builder.to_vector()
|
||||
}
|
||||
|
||||
fn get_ref(&self, index: usize) -> ValueRef {
|
||||
if self.array.is_valid(index) {
|
||||
// Safety: The index have been checked by `is_valid()`.
|
||||
@@ -167,9 +148,7 @@ impl<T: Primitive, Ptr: std::borrow::Borrow<Option<T>>> FromIterator<Ptr> for Pr
|
||||
|
||||
impl<T> ScalarVector for PrimitiveVector<T>
|
||||
where
|
||||
T: Scalar<VectorType = Self> + PrimitiveElement,
|
||||
for<'a> T: ScalarRef<'a, ScalarType = T, VectorType = Self>,
|
||||
for<'a> T: Scalar<RefType<'a> = T>,
|
||||
T: PrimitiveElement,
|
||||
{
|
||||
type OwnedItem = T;
|
||||
type RefItem<'a> = T;
|
||||
@@ -216,16 +195,18 @@ impl<'a, T: Copy> Iterator for PrimitiveIter<'a, T> {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PrimitiveVectorBuilder<T: PrimitiveElement> {
|
||||
pub(crate) mutable_array: MutablePrimitiveArray<T>,
|
||||
impl<T: PrimitiveElement> Serializable for PrimitiveVector<T> {
|
||||
fn serialize_to_json(&self) -> Result<Vec<JsonValue>> {
|
||||
self.array
|
||||
.iter()
|
||||
.map(serde_json::to_value)
|
||||
.collect::<serde_json::Result<_>>()
|
||||
.context(SerializeSnafu)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: PrimitiveElement> PrimitiveVectorBuilder<T> {
|
||||
fn with_capacity(capacity: usize) -> Self {
|
||||
Self {
|
||||
mutable_array: MutablePrimitiveArray::with_capacity(capacity),
|
||||
}
|
||||
}
|
||||
pub struct PrimitiveVectorBuilder<T: PrimitiveElement> {
|
||||
pub(crate) mutable_array: MutablePrimitiveArray<T>,
|
||||
}
|
||||
|
||||
pub type UInt8VectorBuilder = PrimitiveVectorBuilder<u8>;
|
||||
@@ -259,9 +240,7 @@ impl<T: PrimitiveElement> MutableVector for PrimitiveVectorBuilder<T> {
|
||||
}
|
||||
|
||||
fn to_vector(&mut self) -> VectorRef {
|
||||
Arc::new(PrimitiveVector::<T> {
|
||||
array: std::mem::take(&mut self.mutable_array).into(),
|
||||
})
|
||||
Arc::new(self.finish())
|
||||
}
|
||||
|
||||
fn push_value_ref(&mut self, value: ValueRef) -> Result<()> {
|
||||
@@ -304,16 +283,58 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: PrimitiveElement> Serializable for PrimitiveVector<T> {
|
||||
fn serialize_to_json(&self) -> Result<Vec<JsonValue>> {
|
||||
self.array
|
||||
.iter()
|
||||
.map(serde_json::to_value)
|
||||
.collect::<serde_json::Result<_>>()
|
||||
.context(SerializeSnafu)
|
||||
impl<T: PrimitiveElement> PrimitiveVectorBuilder<T> {
|
||||
fn with_type_capacity(data_type: ConcreteDataType, capacity: usize) -> Self {
|
||||
Self {
|
||||
mutable_array: MutablePrimitiveArray::with_capacity_from(
|
||||
capacity,
|
||||
data_type.as_arrow_type(),
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn replicate_primitive<T: PrimitiveElement>(
|
||||
vector: &PrimitiveVector<T>,
|
||||
offsets: &[usize],
|
||||
) -> VectorRef {
|
||||
Arc::new(replicate_primitive_with_type(
|
||||
vector,
|
||||
offsets,
|
||||
T::build_data_type(),
|
||||
))
|
||||
}
|
||||
|
||||
pub(crate) fn replicate_primitive_with_type<T: PrimitiveElement>(
|
||||
vector: &PrimitiveVector<T>,
|
||||
offsets: &[usize],
|
||||
data_type: ConcreteDataType,
|
||||
) -> PrimitiveVector<T> {
|
||||
assert_eq!(offsets.len(), vector.len());
|
||||
|
||||
if offsets.is_empty() {
|
||||
return vector.slice(0, 0);
|
||||
}
|
||||
|
||||
let mut builder = PrimitiveVectorBuilder::<T>::with_type_capacity(
|
||||
data_type,
|
||||
*offsets.last().unwrap() as usize,
|
||||
);
|
||||
|
||||
let mut previous_offset = 0;
|
||||
|
||||
for (i, offset) in offsets.iter().enumerate() {
|
||||
let data = unsafe { vector.array.value_unchecked(i) };
|
||||
builder.mutable_array.extend(
|
||||
std::iter::repeat(data)
|
||||
.take(*offset - previous_offset)
|
||||
.map(Option::Some),
|
||||
);
|
||||
previous_offset = *offset;
|
||||
}
|
||||
builder.finish()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use arrow::datatypes::DataType as ArrowDataType;
|
||||
@@ -425,20 +446,6 @@ mod tests {
|
||||
assert_eq!(Validity::AllValid, vector.validity());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_replicate() {
|
||||
let v = PrimitiveVector::<i32>::from_slice((0..5).collect::<Vec<i32>>());
|
||||
|
||||
let offsets = [0usize, 1usize, 2usize, 3usize, 4usize];
|
||||
|
||||
let v = v.replicate(&offsets);
|
||||
assert_eq!(4, v.len());
|
||||
|
||||
for i in 0..4 {
|
||||
assert_eq!(Value::Int32(i as i32 + 1), v.get(i));
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_memory_size() {
|
||||
let v = PrimitiveVector::<i32>::from_slice((0..5).collect::<Vec<i32>>());
|
||||
|
||||
@@ -9,7 +9,7 @@ use snafu::{OptionExt, ResultExt};
|
||||
use crate::arrow_array::{MutableStringArray, StringArray};
|
||||
use crate::data_type::ConcreteDataType;
|
||||
use crate::error::{Result, SerializeSnafu};
|
||||
use crate::scalars::{common, ScalarVector, ScalarVectorBuilder};
|
||||
use crate::scalars::{ScalarVector, ScalarVectorBuilder};
|
||||
use crate::serialize::Serializable;
|
||||
use crate::types::StringType;
|
||||
use crate::value::{Value, ValueRef};
|
||||
@@ -21,6 +21,12 @@ pub struct StringVector {
|
||||
array: StringArray,
|
||||
}
|
||||
|
||||
impl StringVector {
|
||||
pub(crate) fn as_arrow(&self) -> &dyn Array {
|
||||
&self.array
|
||||
}
|
||||
}
|
||||
|
||||
impl From<StringArray> for StringVector {
|
||||
fn from(array: StringArray) -> Self {
|
||||
Self { array }
|
||||
@@ -112,10 +118,6 @@ impl Vector for StringVector {
|
||||
vectors::impl_get_for_vector!(self.array, index)
|
||||
}
|
||||
|
||||
fn replicate(&self, offsets: &[usize]) -> VectorRef {
|
||||
common::replicate_scalar_vector(self, offsets)
|
||||
}
|
||||
|
||||
fn get_ref(&self, index: usize) -> ValueRef {
|
||||
vectors::impl_get_ref_for_vector!(self.array, index)
|
||||
}
|
||||
|
||||
@@ -48,6 +48,10 @@ impl TimestampVector {
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn as_arrow(&self) -> &dyn Array {
|
||||
self.array.as_arrow()
|
||||
}
|
||||
}
|
||||
|
||||
impl Vector for TimestampVector {
|
||||
@@ -117,10 +121,6 @@ impl Vector for TimestampVector {
|
||||
}
|
||||
}
|
||||
|
||||
fn replicate(&self, offsets: &[usize]) -> VectorRef {
|
||||
self.array.replicate(offsets)
|
||||
}
|
||||
|
||||
fn get_ref(&self, index: usize) -> ValueRef {
|
||||
match self.array.get(index) {
|
||||
Value::Int64(v) => ValueRef::Timestamp(Timestamp::from_millis(v)),
|
||||
@@ -247,6 +247,15 @@ impl ScalarVectorBuilder for TimestampVectorBuilder {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn replicate_timestamp(vector: &TimestampVector, offsets: &[usize]) -> VectorRef {
|
||||
let array = crate::vectors::primitive::replicate_primitive_with_type(
|
||||
&vector.array,
|
||||
offsets,
|
||||
vector.data_type(),
|
||||
);
|
||||
Arc::new(TimestampVector { array })
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@@ -284,4 +293,13 @@ mod tests {
|
||||
vector.iter_data().collect::<Vec<_>>()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_timestamp_from_arrow() {
|
||||
let vector =
|
||||
TimestampVector::from_slice(&[Timestamp::from_millis(1), Timestamp::from_millis(2)]);
|
||||
let arrow = vector.as_arrow().slice(0, vector.len());
|
||||
let vector2 = TimestampVector::try_from_arrow_array(&arrow).unwrap();
|
||||
assert_eq!(vector, vector2);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user