mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-10 07:12:54 +00:00
feat: Implements diff accumulator using WrapperType (#715)
* feat: Remove usage of opaque error from common::recordbatch * feat: Remove opaque error from common::query * feat: Fix diff compiler errors Now common_function just use common_query's Error and Result. Adds a LargestType associated type to LogicalPrimitiveType to get the largest type a logical primitive type can cast to. * feat: Remove LargestType from NativeType trait * chore: Update comments * feat: Restrict Scalar::RefType of WrapperType to itself Add trait bound `for<'a> Scalar<RefType<'a> = Self>` to WrapperType * chore: Address CR comments * chore: Format codes
This commit is contained in:
@@ -1,69 +0,0 @@
|
||||
// Copyright 2022 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::any::Any;
|
||||
|
||||
use common_error::prelude::*;
|
||||
pub use common_query::error::{Error, Result};
|
||||
use datatypes::error::Error as DataTypeError;
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
#[snafu(visibility(pub))]
|
||||
pub enum InnerError {
|
||||
#[snafu(display("Fail to get scalar vector, {}", source))]
|
||||
GetScalarVector {
|
||||
source: DataTypeError,
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
}
|
||||
|
||||
impl ErrorExt for InnerError {
|
||||
fn backtrace_opt(&self) -> Option<&Backtrace> {
|
||||
ErrorCompat::backtrace(self)
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl From<InnerError> for Error {
|
||||
fn from(err: InnerError) -> Self {
|
||||
Self::new(err)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use snafu::GenerateImplicitData;
|
||||
|
||||
use super::*;
|
||||
|
||||
fn raise_datatype_error() -> std::result::Result<(), DataTypeError> {
|
||||
Err(DataTypeError::Conversion {
|
||||
from: "test".to_string(),
|
||||
backtrace: Backtrace::generate(),
|
||||
})
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_scalar_vector_error() {
|
||||
let err: Error = raise_datatype_error()
|
||||
.context(GetScalarVectorSnafu)
|
||||
.err()
|
||||
.unwrap()
|
||||
.into();
|
||||
assert!(err.backtrace_opt().is_some());
|
||||
}
|
||||
}
|
||||
@@ -12,5 +12,4 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
pub mod error;
|
||||
pub mod scalars;
|
||||
|
||||
@@ -22,40 +22,41 @@ use common_query::error::{
|
||||
use common_query::logical_plan::{Accumulator, AggregateFunctionCreator};
|
||||
use common_query::prelude::*;
|
||||
use datatypes::prelude::*;
|
||||
use datatypes::types::PrimitiveType;
|
||||
use datatypes::types::{LogicalPrimitiveType, WrapperType};
|
||||
use datatypes::value::ListValue;
|
||||
use datatypes::vectors::{ConstantVector, ListVector};
|
||||
use datatypes::vectors::{ConstantVector, Helper, ListVector};
|
||||
use datatypes::with_match_primitive_type_id;
|
||||
use num_traits::AsPrimitive;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
|
||||
// https://numpy.org/doc/stable/reference/generated/numpy.diff.html
|
||||
// I is the input type, O is the output type.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct Diff<T, SubT>
|
||||
pub struct Diff<I, O>
|
||||
where
|
||||
T: Primitive + AsPrimitive<SubT>,
|
||||
SubT: Primitive + std::ops::Sub<Output = SubT>,
|
||||
I: WrapperType,
|
||||
O: WrapperType,
|
||||
{
|
||||
values: Vec<T>,
|
||||
_phantom: PhantomData<SubT>,
|
||||
values: Vec<I>,
|
||||
_phantom: PhantomData<O>,
|
||||
}
|
||||
|
||||
impl<T, SubT> Diff<T, SubT>
|
||||
impl<I, O> Diff<I, O>
|
||||
where
|
||||
T: Primitive + AsPrimitive<SubT>,
|
||||
SubT: Primitive + std::ops::Sub<Output = SubT>,
|
||||
I: WrapperType,
|
||||
O: WrapperType,
|
||||
{
|
||||
fn push(&mut self, value: T) {
|
||||
fn push(&mut self, value: I) {
|
||||
self.values.push(value);
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, SubT> Accumulator for Diff<T, SubT>
|
||||
impl<I, O> Accumulator for Diff<I, O>
|
||||
where
|
||||
T: Primitive + AsPrimitive<SubT>,
|
||||
for<'a> T: Scalar<RefType<'a> = T>,
|
||||
SubT: Primitive + std::ops::Sub<Output = SubT>,
|
||||
for<'a> SubT: Scalar<RefType<'a> = SubT>,
|
||||
I: WrapperType,
|
||||
O: WrapperType,
|
||||
I::Native: AsPrimitive<O::Native>,
|
||||
O::Native: std::ops::Sub<Output = O::Native>,
|
||||
{
|
||||
fn state(&self) -> Result<Vec<Value>> {
|
||||
let nums = self
|
||||
@@ -65,7 +66,7 @@ where
|
||||
.collect::<Vec<Value>>();
|
||||
Ok(vec![Value::List(ListValue::new(
|
||||
Some(Box::new(nums)),
|
||||
T::default().into().data_type(),
|
||||
I::LogicalType::build_data_type(),
|
||||
))])
|
||||
}
|
||||
|
||||
@@ -78,12 +79,12 @@ where
|
||||
|
||||
let column = &values[0];
|
||||
let mut len = 1;
|
||||
let column: &<T as Scalar>::VectorType = if column.is_const() {
|
||||
let column: &<I as Scalar>::VectorType = if column.is_const() {
|
||||
len = column.len();
|
||||
let column: &ConstantVector = unsafe { VectorHelper::static_cast(column) };
|
||||
unsafe { VectorHelper::static_cast(column.inner()) }
|
||||
let column: &ConstantVector = unsafe { Helper::static_cast(column) };
|
||||
unsafe { Helper::static_cast(column.inner()) }
|
||||
} else {
|
||||
unsafe { VectorHelper::static_cast(column) }
|
||||
unsafe { Helper::static_cast(column) }
|
||||
};
|
||||
(0..len).for_each(|_| {
|
||||
for v in column.iter_data().flatten() {
|
||||
@@ -109,8 +110,9 @@ where
|
||||
),
|
||||
})?;
|
||||
for state in states.values_iter() {
|
||||
let state = state.context(FromScalarValueSnafu)?;
|
||||
self.update_batch(&[state])?
|
||||
if let Some(state) = state.context(FromScalarValueSnafu)? {
|
||||
self.update_batch(&[state])?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -122,11 +124,14 @@ where
|
||||
let diff = self
|
||||
.values
|
||||
.windows(2)
|
||||
.map(|x| (x[1].as_() - x[0].as_()).into())
|
||||
.map(|x| {
|
||||
let native = x[1].into_native().as_() - x[0].into_native().as_();
|
||||
O::from_native(native).into()
|
||||
})
|
||||
.collect::<Vec<Value>>();
|
||||
let diff = Value::List(ListValue::new(
|
||||
Some(Box::new(diff)),
|
||||
SubT::default().into().data_type(),
|
||||
O::LogicalType::build_data_type(),
|
||||
));
|
||||
Ok(diff)
|
||||
}
|
||||
@@ -143,7 +148,7 @@ impl AggregateFunctionCreator for DiffAccumulatorCreator {
|
||||
with_match_primitive_type_id!(
|
||||
input_type.logical_type_id(),
|
||||
|$S| {
|
||||
Ok(Box::new(Diff::<$S,<$S as Primitive>::LargestType>::default()))
|
||||
Ok(Box::new(Diff::<<$S as LogicalPrimitiveType>::Wrapper, <<$S as LogicalPrimitiveType>::LargestType as LogicalPrimitiveType>::Wrapper>::default()))
|
||||
},
|
||||
{
|
||||
let err_msg = format!(
|
||||
@@ -163,7 +168,7 @@ impl AggregateFunctionCreator for DiffAccumulatorCreator {
|
||||
with_match_primitive_type_id!(
|
||||
input_types[0].logical_type_id(),
|
||||
|$S| {
|
||||
Ok(ConcreteDataType::list_datatype(PrimitiveType::<<$S as Primitive>::LargestType>::default().into()))
|
||||
Ok(ConcreteDataType::list_datatype($S::default().into()))
|
||||
},
|
||||
{
|
||||
unreachable!()
|
||||
@@ -177,7 +182,7 @@ impl AggregateFunctionCreator for DiffAccumulatorCreator {
|
||||
with_match_primitive_type_id!(
|
||||
input_types[0].logical_type_id(),
|
||||
|$S| {
|
||||
Ok(vec![ConcreteDataType::list_datatype(PrimitiveType::<$S>::default().into())])
|
||||
Ok(vec![ConcreteDataType::list_datatype($S::default().into())])
|
||||
},
|
||||
{
|
||||
unreachable!()
|
||||
@@ -188,9 +193,10 @@ impl AggregateFunctionCreator for DiffAccumulatorCreator {
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use datatypes::vectors::PrimitiveVector;
|
||||
use datatypes::vectors::Int32Vector;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_update_batch() {
|
||||
// test update empty batch, expect not updating anything
|
||||
@@ -201,21 +207,19 @@ mod test {
|
||||
|
||||
// test update one not-null value
|
||||
let mut diff = Diff::<i32, i64>::default();
|
||||
let v: Vec<VectorRef> = vec![Arc::new(PrimitiveVector::<i32>::from(vec![Some(42)]))];
|
||||
let v: Vec<VectorRef> = vec![Arc::new(Int32Vector::from(vec![Some(42)]))];
|
||||
assert!(diff.update_batch(&v).is_ok());
|
||||
assert_eq!(Value::Null, diff.evaluate().unwrap());
|
||||
|
||||
// test update one null value
|
||||
let mut diff = Diff::<i32, i64>::default();
|
||||
let v: Vec<VectorRef> = vec![Arc::new(PrimitiveVector::<i32>::from(vec![
|
||||
Option::<i32>::None,
|
||||
]))];
|
||||
let v: Vec<VectorRef> = vec![Arc::new(Int32Vector::from(vec![Option::<i32>::None]))];
|
||||
assert!(diff.update_batch(&v).is_ok());
|
||||
assert_eq!(Value::Null, diff.evaluate().unwrap());
|
||||
|
||||
// test update no null-value batch
|
||||
let mut diff = Diff::<i32, i64>::default();
|
||||
let v: Vec<VectorRef> = vec![Arc::new(PrimitiveVector::<i32>::from(vec![
|
||||
let v: Vec<VectorRef> = vec![Arc::new(Int32Vector::from(vec![
|
||||
Some(-1i32),
|
||||
Some(1),
|
||||
Some(2),
|
||||
@@ -232,7 +236,7 @@ mod test {
|
||||
|
||||
// test update null-value batch
|
||||
let mut diff = Diff::<i32, i64>::default();
|
||||
let v: Vec<VectorRef> = vec![Arc::new(PrimitiveVector::<i32>::from(vec![
|
||||
let v: Vec<VectorRef> = vec![Arc::new(Int32Vector::from(vec![
|
||||
Some(-2i32),
|
||||
None,
|
||||
Some(3),
|
||||
@@ -251,7 +255,7 @@ mod test {
|
||||
// test update with constant vector
|
||||
let mut diff = Diff::<i32, i64>::default();
|
||||
let v: Vec<VectorRef> = vec![Arc::new(ConstantVector::new(
|
||||
Arc::new(PrimitiveVector::<i32>::from_vec(vec![4])),
|
||||
Arc::new(Int32Vector::from_vec(vec![4])),
|
||||
4,
|
||||
))];
|
||||
let values = vec![Value::from(0_i64), Value::from(0_i64), Value::from(0_i64)];
|
||||
|
||||
@@ -16,12 +16,11 @@ use std::fmt;
|
||||
use std::sync::Arc;
|
||||
|
||||
use chrono_tz::Tz;
|
||||
use common_query::error::Result;
|
||||
use common_query::prelude::Signature;
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::vectors::VectorRef;
|
||||
|
||||
use crate::error::Result;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct FunctionContext {
|
||||
pub tz: Tz,
|
||||
|
||||
@@ -19,7 +19,8 @@ use common_query::prelude::{
|
||||
ColumnarValue, ReturnTypeFunction, ScalarFunctionImplementation, ScalarUdf, ScalarValue,
|
||||
};
|
||||
use datatypes::error::Error as DataTypeError;
|
||||
use datatypes::prelude::{ConcreteDataType, VectorHelper};
|
||||
use datatypes::prelude::*;
|
||||
use datatypes::vectors::Helper;
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::scalars::function::{FunctionContext, FunctionRef};
|
||||
@@ -47,7 +48,7 @@ pub fn create_udf(func: FunctionRef) -> ScalarUdf {
|
||||
let args: Result<Vec<_>, DataTypeError> = args
|
||||
.iter()
|
||||
.map(|arg| match arg {
|
||||
ColumnarValue::Scalar(v) => VectorHelper::try_from_scalar_value(v.clone(), rows),
|
||||
ColumnarValue::Scalar(v) => Helper::try_from_scalar_value(v.clone(), rows),
|
||||
ColumnarValue::Vector(v) => Ok(v.clone()),
|
||||
})
|
||||
.collect();
|
||||
|
||||
@@ -23,17 +23,9 @@ use datatypes::error::Error as DataTypeError;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use statrs::StatsError;
|
||||
|
||||
common_error::define_opaque_error!(Error);
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
#[snafu(visibility(pub))]
|
||||
pub enum InnerError {
|
||||
#[snafu(display("Fail to cast array to {:?}, source: {}", typ, source))]
|
||||
TypeCast {
|
||||
source: ArrowError,
|
||||
typ: arrow::datatypes::DataType,
|
||||
},
|
||||
|
||||
pub enum Error {
|
||||
#[snafu(display("Fail to execute function, source: {}", source))]
|
||||
ExecuteFunction {
|
||||
source: DataFusionError,
|
||||
@@ -135,43 +127,56 @@ pub enum InnerError {
|
||||
source: BoxedError,
|
||||
},
|
||||
|
||||
#[snafu(display("Fail to cast array to {:?}, source: {}", typ, source))]
|
||||
TypeCast {
|
||||
source: ArrowError,
|
||||
typ: arrow::datatypes::DataType,
|
||||
},
|
||||
|
||||
#[snafu(display("Query engine fail to cast value: {}", source))]
|
||||
ToScalarValue {
|
||||
#[snafu(backtrace)]
|
||||
source: DataTypeError,
|
||||
},
|
||||
|
||||
#[snafu(display("Fail to get scalar vector, {}", source))]
|
||||
GetScalarVector {
|
||||
#[snafu(backtrace)]
|
||||
source: DataTypeError,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
impl ErrorExt for InnerError {
|
||||
impl ErrorExt for Error {
|
||||
fn status_code(&self) -> StatusCode {
|
||||
match self {
|
||||
InnerError::ExecuteFunction { .. }
|
||||
| InnerError::GenerateFunction { .. }
|
||||
| InnerError::CreateAccumulator { .. }
|
||||
| InnerError::DowncastVector { .. }
|
||||
| InnerError::InvalidInputState { .. }
|
||||
| InnerError::InvalidInputCol { .. }
|
||||
| InnerError::BadAccumulatorImpl { .. }
|
||||
| InnerError::ToScalarValue { .. } => StatusCode::EngineExecuteQuery,
|
||||
Error::ExecuteFunction { .. }
|
||||
| Error::GenerateFunction { .. }
|
||||
| Error::CreateAccumulator { .. }
|
||||
| Error::DowncastVector { .. }
|
||||
| Error::InvalidInputState { .. }
|
||||
| Error::InvalidInputCol { .. }
|
||||
| Error::BadAccumulatorImpl { .. }
|
||||
| Error::ToScalarValue { .. }
|
||||
| Error::GetScalarVector { .. } => StatusCode::EngineExecuteQuery,
|
||||
|
||||
InnerError::InvalidInputs { source, .. }
|
||||
| InnerError::IntoVector { source, .. }
|
||||
| InnerError::FromScalarValue { source }
|
||||
| InnerError::ConvertArrowSchema { source }
|
||||
| InnerError::FromArrowArray { source } => source.status_code(),
|
||||
Error::InvalidInputs { source, .. }
|
||||
| Error::IntoVector { source, .. }
|
||||
| Error::FromScalarValue { source }
|
||||
| Error::ConvertArrowSchema { source }
|
||||
| Error::FromArrowArray { source } => source.status_code(),
|
||||
|
||||
InnerError::ExecuteRepeatedly { .. }
|
||||
| InnerError::GeneralDataFusion { .. }
|
||||
| InnerError::DataFusionExecutionPlan { .. } => StatusCode::Unexpected,
|
||||
Error::ExecuteRepeatedly { .. }
|
||||
| Error::GeneralDataFusion { .. }
|
||||
| Error::DataFusionExecutionPlan { .. } => StatusCode::Unexpected,
|
||||
|
||||
InnerError::UnsupportedInputDataType { .. } | InnerError::TypeCast { .. } => {
|
||||
Error::UnsupportedInputDataType { .. } | Error::TypeCast { .. } => {
|
||||
StatusCode::InvalidArguments
|
||||
}
|
||||
|
||||
InnerError::ConvertDfRecordBatchStream { source, .. } => source.status_code(),
|
||||
InnerError::ExecutePhysicalPlan { source } => source.status_code(),
|
||||
Error::ConvertDfRecordBatchStream { source, .. } => source.status_code(),
|
||||
Error::ExecutePhysicalPlan { source } => source.status_code(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -184,12 +189,6 @@ impl ErrorExt for InnerError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<InnerError> for Error {
|
||||
fn from(e: InnerError) -> Error {
|
||||
Error::new(e)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Error> for DataFusionError {
|
||||
fn from(e: Error) -> DataFusionError {
|
||||
DataFusionError::External(Box::new(e))
|
||||
@@ -198,7 +197,7 @@ impl From<Error> for DataFusionError {
|
||||
|
||||
impl From<BoxedError> for Error {
|
||||
fn from(source: BoxedError) -> Self {
|
||||
InnerError::ExecutePhysicalPlan { source }.into()
|
||||
Error::ExecutePhysicalPlan { source }.into()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -214,7 +213,7 @@ mod tests {
|
||||
}
|
||||
|
||||
fn assert_error(err: &Error, code: StatusCode) {
|
||||
let inner_err = err.as_any().downcast_ref::<InnerError>().unwrap();
|
||||
let inner_err = err.as_any().downcast_ref::<Error>().unwrap();
|
||||
assert_eq!(code, inner_err.status_code());
|
||||
assert!(inner_err.backtrace_opt().is_some());
|
||||
}
|
||||
@@ -250,14 +249,14 @@ mod tests {
|
||||
.err()
|
||||
.unwrap()
|
||||
.into();
|
||||
assert_eq!(error.inner.status_code(), StatusCode::Unexpected);
|
||||
assert_eq!(error.status_code(), StatusCode::Unexpected);
|
||||
assert!(error.backtrace_opt().is_some());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_convert_df_recordbatch_stream_error() {
|
||||
let result: std::result::Result<i32, common_recordbatch::error::Error> =
|
||||
Err(common_recordbatch::error::InnerError::PollStream {
|
||||
Err(common_recordbatch::error::Error::PollStream {
|
||||
source: ArrowError::DivideByZero,
|
||||
backtrace: Backtrace::generate(),
|
||||
}
|
||||
@@ -267,7 +266,7 @@ mod tests {
|
||||
.err()
|
||||
.unwrap()
|
||||
.into();
|
||||
assert_eq!(error.inner.status_code(), StatusCode::Internal);
|
||||
assert_eq!(error.status_code(), StatusCode::Internal);
|
||||
assert!(error.backtrace_opt().is_some());
|
||||
}
|
||||
|
||||
|
||||
@@ -17,12 +17,10 @@
|
||||
use std::fmt::Debug;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_time::timestamp::TimeUnit;
|
||||
use datafusion_common::Result as DfResult;
|
||||
use datafusion_expr::{Accumulator as DfAccumulator, AggregateState};
|
||||
use datatypes::arrow::array::ArrayRef;
|
||||
use datatypes::prelude::*;
|
||||
use datatypes::value::ListValue;
|
||||
use datatypes::vectors::{Helper as VectorHelper, VectorRef};
|
||||
use snafu::ResultExt;
|
||||
|
||||
@@ -135,8 +133,7 @@ impl DfAccumulator for DfAccumulatorAdaptor {
|
||||
return error::BadAccumulatorImplSnafu {
|
||||
err_msg: format!("Accumulator {:?} returned state values size do not match its state types size.", self),
|
||||
}
|
||||
.fail()
|
||||
.map_err(Error::from)?;
|
||||
.fail()?;
|
||||
}
|
||||
Ok(state_values
|
||||
.into_iter()
|
||||
@@ -147,31 +144,26 @@ impl DfAccumulator for DfAccumulatorAdaptor {
|
||||
.context(error::ToScalarValueSnafu)?;
|
||||
Ok(AggregateState::Scalar(scalar))
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()
|
||||
.map_err(Error::from)?)
|
||||
.collect::<Result<Vec<_>>>()?)
|
||||
}
|
||||
|
||||
fn update_batch(&mut self, values: &[ArrayRef]) -> DfResult<()> {
|
||||
let vectors = VectorHelper::try_into_vectors(values)
|
||||
.context(FromScalarValueSnafu)
|
||||
.map_err(Error::from)?;
|
||||
self.accumulator
|
||||
.update_batch(&vectors)
|
||||
.map_err(|e| e.into())
|
||||
let vectors = VectorHelper::try_into_vectors(values).context(FromScalarValueSnafu)?;
|
||||
self.accumulator.update_batch(&vectors)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn merge_batch(&mut self, states: &[ArrayRef]) -> DfResult<()> {
|
||||
let mut vectors = Vec::with_capacity(states.len());
|
||||
for array in states.iter() {
|
||||
vectors.push(
|
||||
VectorHelper::try_into_vector(array)
|
||||
.context(IntoVectorSnafu {
|
||||
data_type: array.data_type().clone(),
|
||||
})
|
||||
.map_err(Error::from)?,
|
||||
VectorHelper::try_into_vector(array).context(IntoVectorSnafu {
|
||||
data_type: array.data_type().clone(),
|
||||
})?,
|
||||
);
|
||||
}
|
||||
self.accumulator.merge_batch(&vectors).map_err(|e| e.into())
|
||||
self.accumulator.merge_batch(&vectors)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn evaluate(&self) -> DfResult<ScalarValue> {
|
||||
|
||||
@@ -168,8 +168,7 @@ impl DfPhysicalPlan for DfPhysicalPlanAdapter {
|
||||
let schema: SchemaRef = Arc::new(
|
||||
df_schema
|
||||
.try_into()
|
||||
.context(error::ConvertArrowSchemaSnafu)
|
||||
.map_err(error::Error::from)?,
|
||||
.context(error::ConvertArrowSchemaSnafu)?,
|
||||
);
|
||||
let children = children
|
||||
.into_iter()
|
||||
|
||||
@@ -17,13 +17,12 @@ use std::any::Any;
|
||||
|
||||
use common_error::ext::BoxedError;
|
||||
use common_error::prelude::*;
|
||||
common_error::define_opaque_error!(Error);
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
#[snafu(visibility(pub))]
|
||||
pub enum InnerError {
|
||||
pub enum Error {
|
||||
#[snafu(display("Fail to create datafusion record batch, source: {}", source))]
|
||||
NewDfRecordBatch {
|
||||
source: datatypes::arrow::error::ArrowError,
|
||||
@@ -67,19 +66,19 @@ pub enum InnerError {
|
||||
},
|
||||
}
|
||||
|
||||
impl ErrorExt for InnerError {
|
||||
impl ErrorExt for Error {
|
||||
fn status_code(&self) -> StatusCode {
|
||||
match self {
|
||||
InnerError::NewDfRecordBatch { .. } => StatusCode::InvalidArguments,
|
||||
Error::NewDfRecordBatch { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
InnerError::DataTypes { .. }
|
||||
| InnerError::CreateRecordBatches { .. }
|
||||
| InnerError::PollStream { .. }
|
||||
| InnerError::Format { .. } => StatusCode::Internal,
|
||||
Error::DataTypes { .. }
|
||||
| Error::CreateRecordBatches { .. }
|
||||
| Error::PollStream { .. }
|
||||
| Error::Format { .. } => StatusCode::Internal,
|
||||
|
||||
InnerError::External { source } => source.status_code(),
|
||||
Error::External { source } => source.status_code(),
|
||||
|
||||
InnerError::SchemaConversion { source, .. } => source.status_code(),
|
||||
Error::SchemaConversion { source, .. } => source.status_code(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -91,9 +90,3 @@ impl ErrorExt for InnerError {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl From<InnerError> for Error {
|
||||
fn from(e: InnerError) -> Error {
|
||||
Error::new(e)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -35,7 +35,7 @@ mod tests {
|
||||
use futures::Stream;
|
||||
|
||||
use super::*;
|
||||
use crate::{RecordBatchStream};
|
||||
use crate::RecordBatchStream;
|
||||
|
||||
struct MockRecordBatchStream {
|
||||
batch: Option<RecordBatch>,
|
||||
|
||||
@@ -59,6 +59,7 @@ impl LogicalPrimitiveType for DateType {
|
||||
type ArrowPrimitive = Date32Type;
|
||||
type Native = i32;
|
||||
type Wrapper = Date;
|
||||
type LargestType = Self;
|
||||
|
||||
fn build_data_type() -> ConcreteDataType {
|
||||
ConcreteDataType::date_datatype()
|
||||
|
||||
@@ -57,6 +57,7 @@ impl LogicalPrimitiveType for DateTimeType {
|
||||
type ArrowPrimitive = Date64Type;
|
||||
type Native = i64;
|
||||
type Wrapper = DateTime;
|
||||
type LargestType = Self;
|
||||
|
||||
fn build_data_type() -> ConcreteDataType {
|
||||
ConcreteDataType::datetime_datatype()
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::cmp::Ordering;
|
||||
use std::fmt;
|
||||
|
||||
use arrow::datatypes::{ArrowNativeType, ArrowPrimitiveType, DataType as ArrowDataType};
|
||||
use common_time::{Date, DateTime};
|
||||
@@ -29,36 +30,34 @@ use crate::value::{Value, ValueRef};
|
||||
use crate::vectors::{MutableVector, PrimitiveVector, PrimitiveVectorBuilder, Vector};
|
||||
|
||||
/// Data types that can be used as arrow's native type.
|
||||
pub trait NativeType: ArrowNativeType + NumCast {
|
||||
/// Largest numeric type this primitive type can be cast to.
|
||||
type LargestType: NativeType;
|
||||
}
|
||||
pub trait NativeType: ArrowNativeType + NumCast {}
|
||||
|
||||
macro_rules! impl_native_type {
|
||||
($Type: ident, $LargestType: ident) => {
|
||||
impl NativeType for $Type {
|
||||
type LargestType = $LargestType;
|
||||
}
|
||||
($Type: ident) => {
|
||||
impl NativeType for $Type {}
|
||||
};
|
||||
}
|
||||
|
||||
impl_native_type!(u8, u64);
|
||||
impl_native_type!(u16, u64);
|
||||
impl_native_type!(u32, u64);
|
||||
impl_native_type!(u64, u64);
|
||||
impl_native_type!(i8, i64);
|
||||
impl_native_type!(i16, i64);
|
||||
impl_native_type!(i32, i64);
|
||||
impl_native_type!(i64, i64);
|
||||
impl_native_type!(f32, f64);
|
||||
impl_native_type!(f64, f64);
|
||||
impl_native_type!(u8);
|
||||
impl_native_type!(u16);
|
||||
impl_native_type!(u32);
|
||||
impl_native_type!(u64);
|
||||
impl_native_type!(i8);
|
||||
impl_native_type!(i16);
|
||||
impl_native_type!(i32);
|
||||
impl_native_type!(i64);
|
||||
impl_native_type!(f32);
|
||||
impl_native_type!(f64);
|
||||
|
||||
/// Represents the wrapper type that wraps a native type using the `newtype pattern`,
|
||||
/// such as [Date](`common_time::Date`) is a wrapper type for the underlying native
|
||||
/// type `i32`.
|
||||
pub trait WrapperType:
|
||||
Copy
|
||||
+ Scalar
|
||||
+ Send
|
||||
+ Sync
|
||||
+ fmt::Debug
|
||||
+ for<'a> Scalar<RefType<'a> = Self>
|
||||
+ PartialEq
|
||||
+ Into<Value>
|
||||
+ Into<ValueRef<'static>>
|
||||
@@ -87,6 +86,8 @@ pub trait LogicalPrimitiveType: 'static + Sized {
|
||||
type Wrapper: WrapperType<LogicalType = Self, Native = Self::Native>
|
||||
+ for<'a> Scalar<VectorType = PrimitiveVector<Self>, RefType<'a> = Self::Wrapper>
|
||||
+ for<'a> ScalarRef<'a, ScalarType = Self::Wrapper>;
|
||||
/// Largest type this primitive type can cast to.
|
||||
type LargestType: LogicalPrimitiveType;
|
||||
|
||||
/// Construct the data type struct.
|
||||
fn build_data_type() -> ConcreteDataType;
|
||||
@@ -188,7 +189,7 @@ impl WrapperType for DateTime {
|
||||
}
|
||||
|
||||
macro_rules! define_logical_primitive_type {
|
||||
($Native: ident, $TypeId: ident, $DataType: ident) => {
|
||||
($Native: ident, $TypeId: ident, $DataType: ident, $Largest: ident) => {
|
||||
// We need to define it as an empty struct `struct DataType {}` instead of a struct-unit
|
||||
// `struct DataType;` to ensure the serialized JSON string is compatible with previous
|
||||
// implementation.
|
||||
@@ -199,6 +200,7 @@ macro_rules! define_logical_primitive_type {
|
||||
type ArrowPrimitive = arrow::datatypes::$DataType;
|
||||
type Native = $Native;
|
||||
type Wrapper = $Native;
|
||||
type LargestType = $Largest;
|
||||
|
||||
fn build_data_type() -> ConcreteDataType {
|
||||
ConcreteDataType::$TypeId($DataType::default())
|
||||
@@ -240,8 +242,8 @@ macro_rules! define_logical_primitive_type {
|
||||
}
|
||||
|
||||
macro_rules! define_non_timestamp_primitive {
|
||||
($Native: ident, $TypeId: ident, $DataType: ident) => {
|
||||
define_logical_primitive_type!($Native, $TypeId, $DataType);
|
||||
($Native: ident, $TypeId: ident, $DataType: ident, $Largest: ident) => {
|
||||
define_logical_primitive_type!($Native, $TypeId, $DataType, $Largest);
|
||||
|
||||
impl DataType for $DataType {
|
||||
fn name(&self) -> &str {
|
||||
@@ -271,18 +273,18 @@ macro_rules! define_non_timestamp_primitive {
|
||||
};
|
||||
}
|
||||
|
||||
define_non_timestamp_primitive!(u8, UInt8, UInt8Type);
|
||||
define_non_timestamp_primitive!(u16, UInt16, UInt16Type);
|
||||
define_non_timestamp_primitive!(u32, UInt32, UInt32Type);
|
||||
define_non_timestamp_primitive!(u64, UInt64, UInt64Type);
|
||||
define_non_timestamp_primitive!(i8, Int8, Int8Type);
|
||||
define_non_timestamp_primitive!(i16, Int16, Int16Type);
|
||||
define_non_timestamp_primitive!(i32, Int32, Int32Type);
|
||||
define_non_timestamp_primitive!(f32, Float32, Float32Type);
|
||||
define_non_timestamp_primitive!(f64, Float64, Float64Type);
|
||||
define_non_timestamp_primitive!(u8, UInt8, UInt8Type, UInt64Type);
|
||||
define_non_timestamp_primitive!(u16, UInt16, UInt16Type, UInt64Type);
|
||||
define_non_timestamp_primitive!(u32, UInt32, UInt32Type, UInt64Type);
|
||||
define_non_timestamp_primitive!(u64, UInt64, UInt64Type, UInt64Type);
|
||||
define_non_timestamp_primitive!(i8, Int8, Int8Type, Int64Type);
|
||||
define_non_timestamp_primitive!(i16, Int16, Int16Type, Int64Type);
|
||||
define_non_timestamp_primitive!(i32, Int32, Int32Type, Int64Type);
|
||||
define_non_timestamp_primitive!(f32, Float32, Float32Type, Float64Type);
|
||||
define_non_timestamp_primitive!(f64, Float64, Float64Type, Float64Type);
|
||||
|
||||
// Timestamp primitive:
|
||||
define_logical_primitive_type!(i64, Int64, Int64Type);
|
||||
define_logical_primitive_type!(i64, Int64, Int64Type, Int64Type);
|
||||
|
||||
impl DataType for Int64Type {
|
||||
fn name(&self) -> &str {
|
||||
|
||||
@@ -98,6 +98,7 @@ macro_rules! impl_data_type_for_timestamp {
|
||||
type ArrowPrimitive = [<Arrow Timestamp $unit Type>];
|
||||
type Native = i64;
|
||||
type Wrapper = [<Timestamp $unit>];
|
||||
type LargestType = Self;
|
||||
|
||||
fn build_data_type() -> ConcreteDataType {
|
||||
ConcreteDataType::Timestamp(TimestampType::$unit(
|
||||
|
||||
Reference in New Issue
Block a user