feat(datatypes): implement VectorOp::take (#1115)

* feat: add take index method for VectorOp

* chore: make clippy happy

* chore: make clippy happy

* chore: improve the code

* chore: improve the code

* chore: add take null test

* chore: fix clippy
This commit is contained in:
elijah
2023-03-07 19:27:33 +08:00
committed by GitHub
parent 7169fe2989
commit 819b60ca13
3 changed files with 260 additions and 5 deletions

View File

@@ -16,14 +16,14 @@ use std::any::Any;
use std::fmt;
use std::sync::Arc;
use arrow::array::{Array, ArrayRef};
use snafu::ResultExt;
use arrow::array::{Array, ArrayRef, UInt32Array};
use snafu::{ensure, ResultExt};
use crate::data_type::ConcreteDataType;
use crate::error::{Result, SerializeSnafu};
use crate::error::{self, Result, SerializeSnafu};
use crate::serialize::Serializable;
use crate::value::{Value, ValueRef};
use crate::vectors::{BooleanVector, Helper, Validity, Vector, VectorRef};
use crate::vectors::{BooleanVector, Helper, UInt32Vector, Validity, Vector, VectorRef};
#[derive(Clone)]
pub struct ConstantVector {
@@ -83,6 +83,35 @@ impl ConstantVector {
self.length,
)))
}
pub(crate) fn take_vector(&self, indices: &UInt32Vector) -> Result<VectorRef> {
if indices.is_empty() {
return Ok(self.slice(0, 0));
}
ensure!(
indices.null_count() == 0,
error::UnsupportedOperationSnafu {
op: "taking a null index",
vector_type: self.vector_type_name(),
}
);
let len = self.len();
let arr = indices.to_arrow_array();
let indices_arr = arr.as_any().downcast_ref::<UInt32Array>().unwrap();
if !arrow::compute::min_boolean(
&arrow::compute::lt_scalar(indices_arr, len as u32).unwrap(),
)
.unwrap()
{
panic!("Array index out of bounds, cannot take index out of the length of the array: {len}");
}
Ok(Arc::new(ConstantVector::new(
self.inner().clone(),
indices.len(),
)))
}
}
impl Vector for ConstantVector {

View File

@@ -16,6 +16,7 @@ mod cast;
mod filter;
mod find_unique;
mod replicate;
mod take;
use common_base::BitVec;
@@ -24,7 +25,7 @@ use crate::types::LogicalPrimitiveType;
use crate::vectors::constant::ConstantVector;
use crate::vectors::{
BinaryVector, BooleanVector, ConcreteDataType, ListVector, NullVector, PrimitiveVector,
StringVector, Vector, VectorRef,
StringVector, UInt32Vector, Vector, VectorRef,
};
/// Vector compute operations.
@@ -63,6 +64,12 @@ pub trait VectorOp {
///
/// TODO(dennis) describe behaviors in details.
fn cast(&self, to_type: &ConcreteDataType) -> Result<VectorRef>;
/// Take elements from the vector by the given indices.
///
/// # Panics
/// Panics if an index is out of bounds.
fn take(&self, indices: &UInt32Vector) -> Result<VectorRef>;
}
macro_rules! impl_scalar_vector_op {
@@ -84,6 +91,10 @@ macro_rules! impl_scalar_vector_op {
fn cast(&self, to_type: &ConcreteDataType) -> Result<VectorRef> {
cast::cast_non_constant!(self, to_type)
}
fn take(&self, indices: &UInt32Vector) -> Result<VectorRef> {
take::take_indices!(self, $VectorType, indices)
}
}
)+};
}
@@ -108,6 +119,10 @@ impl<T: LogicalPrimitiveType> VectorOp for PrimitiveVector<T> {
fn cast(&self, to_type: &ConcreteDataType) -> Result<VectorRef> {
cast::cast_non_constant!(self, to_type)
}
fn take(&self, indices: &UInt32Vector) -> Result<VectorRef> {
take::take_indices!(self, PrimitiveVector<T>, indices)
}
}
impl VectorOp for NullVector {
@@ -131,6 +146,10 @@ impl VectorOp for NullVector {
}
.fail()
}
fn take(&self, indices: &UInt32Vector) -> Result<VectorRef> {
take::take_indices!(self, NullVector, indices)
}
}
impl VectorOp for ConstantVector {
@@ -150,4 +169,8 @@ impl VectorOp for ConstantVector {
fn cast(&self, to_type: &ConcreteDataType) -> Result<VectorRef> {
self.cast_vector(to_type)
}
fn take(&self, indices: &UInt32Vector) -> Result<VectorRef> {
self.take_vector(indices)
}
}

View File

@@ -0,0 +1,203 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
macro_rules! take_indices {
($vector: expr, $VectorType: ty, $indices: ident) => {{
use std::sync::Arc;
use arrow::compute;
use snafu::ResultExt;
let arrow_array = $vector.as_arrow();
let taken = compute::take(arrow_array, $indices.as_arrow(), None)
.context(crate::error::ArrowComputeSnafu)?;
Ok(Arc::new(<$VectorType>::try_from_arrow_array(taken)?))
}};
}
pub(crate) use take_indices;
#[cfg(test)]
mod tests {
use std::sync::Arc;
use arrow::array::{PrimitiveArray, UInt32Array};
use common_time::{Date, DateTime};
use crate::prelude::VectorRef;
use crate::scalars::ScalarVector;
use crate::timestamp::{
TimestampMicrosecond, TimestampMillisecond, TimestampNanosecond, TimestampSecond,
};
use crate::types::{LogicalPrimitiveType, WrapperType};
use crate::vectors::operations::VectorOp;
use crate::vectors::{
BooleanVector, ConstantVector, Int32Vector, NullVector, PrimitiveVector, StringVector,
UInt32Vector,
};
fn check_take_primitive<T>(
input: Vec<Option<T::Native>>,
indices: Vec<Option<u32>>,
expect: Vec<Option<T::Native>>,
) where
T: LogicalPrimitiveType,
PrimitiveArray<T::ArrowPrimitive>: From<Vec<Option<T::Native>>>,
{
let v = PrimitiveVector::<T>::new(PrimitiveArray::<T::ArrowPrimitive>::from(input));
let indices = UInt32Vector::new(UInt32Array::from(indices));
let output = v.take(&indices).unwrap();
let expected: VectorRef = Arc::new(PrimitiveVector::<T>::new(PrimitiveArray::<
T::ArrowPrimitive,
>::from(expect)));
assert_eq!(expected, output);
}
macro_rules! take_time_like_test {
($VectorType: ident, $ValueType: ident, $method: ident) => {{
use $crate::vectors::{$VectorType, VectorRef};
let v = $VectorType::from_iterator((0..5).map($ValueType::$method));
let indices = UInt32Vector::from_slice(&[3, 0, 1, 4]);
let out = v.take(&indices).unwrap();
let expect: VectorRef = Arc::new($VectorType::from_iterator(
[3, 0, 1, 4].into_iter().map($ValueType::$method),
));
assert_eq!(expect, out);
}};
}
#[test]
fn test_take_primitive() {
// nullable int32
check_take_primitive::<crate::types::Int32Type>(
vec![Some(1), None, Some(3), Some(4), Some(-5)],
vec![Some(3), None, Some(0), Some(1), Some(4)],
vec![Some(4), None, Some(1), None, Some(-5)],
);
// nullable float32
check_take_primitive::<crate::types::Float32Type>(
vec![Some(3.24), None, Some(1.34), Some(4.13), Some(5.13)],
vec![Some(3), None, Some(0), Some(1), Some(4)],
vec![Some(4.13), None, Some(3.24), None, Some(5.13)],
);
// nullable uint32
check_take_primitive::<crate::types::UInt32Type>(
vec![Some(0), None, Some(2), Some(3), Some(4)],
vec![Some(4), None, Some(2), Some(1), Some(3)],
vec![Some(4), None, Some(2), None, Some(3)],
);
// test date like type
take_time_like_test!(DateVector, Date, new);
take_time_like_test!(DateTimeVector, DateTime, new);
take_time_like_test!(TimestampSecondVector, TimestampSecond, from_native);
take_time_like_test!(
TimestampMillisecondVector,
TimestampMillisecond,
from_native
);
take_time_like_test!(
TimestampMicrosecondVector,
TimestampMicrosecond,
from_native
);
take_time_like_test!(TimestampNanosecondVector, TimestampNanosecond, from_native);
}
fn check_take_constant(expect_length: usize, input_length: usize, indices: &[u32]) {
let v = ConstantVector::new(Arc::new(Int32Vector::from_slice([111])), input_length);
let indices = UInt32Vector::from_slice(indices);
let out = v.take(&indices).unwrap();
assert!(out.is_const());
assert_eq!(expect_length, out.len());
}
#[test]
fn test_take_constant() {
check_take_constant(2, 5, &[3, 4]);
check_take_constant(3, 10, &[1, 2, 3]);
check_take_constant(4, 10, &[1, 5, 3, 6]);
check_take_constant(5, 10, &[1, 9, 8, 7, 3]);
}
#[test]
#[should_panic]
fn test_take_constant_out_of_index() {
check_take_constant(2, 5, &[3, 5]);
}
#[test]
#[should_panic]
fn test_take_out_of_index() {
let v = Int32Vector::from_slice([1, 2, 3, 4, 5]);
let indies = UInt32Vector::from_slice([1, 5, 6]);
v.take(&indies).unwrap();
}
#[test]
fn test_take_null() {
let v = NullVector::new(5);
let indices = UInt32Vector::from_slice([1, 3, 2]);
let out = v.take(&indices).unwrap();
let expect: VectorRef = Arc::new(NullVector::new(3));
assert_eq!(expect, out);
}
#[test]
fn test_take_scalar() {
let v = StringVector::from_slice(&["0", "1", "2", "3"]);
let indices = UInt32Vector::from_slice([1, 3, 2]);
let out = v.take(&indices).unwrap();
let expect: VectorRef = Arc::new(StringVector::from_slice(&["1", "3", "2"]));
assert_eq!(expect, out);
}
#[test]
fn test_take_bool() {
let v = BooleanVector::from_slice(&[false, true, false, true, false, false, true]);
let indices = UInt32Vector::from_slice([1, 3, 5, 6]);
let out = v.take(&indices).unwrap();
let expected: VectorRef = Arc::new(BooleanVector::from_slice(&[true, true, false, true]));
assert_eq!(out, expected);
let v = BooleanVector::from(vec![
Some(true),
None,
Some(false),
Some(true),
Some(false),
Some(false),
Some(true),
None,
]);
let indices = UInt32Vector::from(vec![Some(1), None, Some(3), Some(5), Some(6)]);
let out = v.take(&indices).unwrap();
let expected: VectorRef = Arc::new(BooleanVector::from(vec![
None,
None,
Some(true),
Some(false),
Some(true),
]));
assert_eq!(out, expected);
}
}