diff --git a/src/common/function/src/error.rs b/src/common/function/src/error.rs deleted file mode 100644 index 73c3928a00..0000000000 --- a/src/common/function/src/error.rs +++ /dev/null @@ -1,69 +0,0 @@ -// Copyright 2022 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::any::Any; - -use common_error::prelude::*; -pub use common_query::error::{Error, Result}; -use datatypes::error::Error as DataTypeError; - -#[derive(Debug, Snafu)] -#[snafu(visibility(pub))] -pub enum InnerError { - #[snafu(display("Fail to get scalar vector, {}", source))] - GetScalarVector { - source: DataTypeError, - backtrace: Backtrace, - }, -} - -impl ErrorExt for InnerError { - fn backtrace_opt(&self) -> Option<&Backtrace> { - ErrorCompat::backtrace(self) - } - - fn as_any(&self) -> &dyn Any { - self - } -} - -impl From for Error { - fn from(err: InnerError) -> Self { - Self::new(err) - } -} - -#[cfg(test)] -mod tests { - use snafu::GenerateImplicitData; - - use super::*; - - fn raise_datatype_error() -> std::result::Result<(), DataTypeError> { - Err(DataTypeError::Conversion { - from: "test".to_string(), - backtrace: Backtrace::generate(), - }) - } - - #[test] - fn test_get_scalar_vector_error() { - let err: Error = raise_datatype_error() - .context(GetScalarVectorSnafu) - .err() - .unwrap() - .into(); - assert!(err.backtrace_opt().is_some()); - } -} diff --git a/src/common/function/src/lib.rs b/src/common/function/src/lib.rs index 5a1b8edacb..8d15fe0b25 100644 --- a/src/common/function/src/lib.rs +++ b/src/common/function/src/lib.rs @@ -12,5 +12,4 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod error; pub mod scalars; diff --git a/src/common/function/src/scalars/aggregate/mod.rs b/src/common/function/src/scalars/aggregate.rs similarity index 100% rename from src/common/function/src/scalars/aggregate/mod.rs rename to src/common/function/src/scalars/aggregate.rs diff --git a/src/common/function/src/scalars/aggregate/diff.rs b/src/common/function/src/scalars/aggregate/diff.rs index d0e7ca3406..2e0b38f1e7 100644 --- a/src/common/function/src/scalars/aggregate/diff.rs +++ b/src/common/function/src/scalars/aggregate/diff.rs @@ -22,40 +22,41 @@ use common_query::error::{ use common_query::logical_plan::{Accumulator, AggregateFunctionCreator}; use common_query::prelude::*; use datatypes::prelude::*; -use datatypes::types::PrimitiveType; +use datatypes::types::{LogicalPrimitiveType, WrapperType}; use datatypes::value::ListValue; -use datatypes::vectors::{ConstantVector, ListVector}; +use datatypes::vectors::{ConstantVector, Helper, ListVector}; use datatypes::with_match_primitive_type_id; use num_traits::AsPrimitive; use snafu::{ensure, OptionExt, ResultExt}; // https://numpy.org/doc/stable/reference/generated/numpy.diff.html +// I is the input type, O is the output type. #[derive(Debug, Default)] -pub struct Diff +pub struct Diff where - T: Primitive + AsPrimitive, - SubT: Primitive + std::ops::Sub, + I: WrapperType, + O: WrapperType, { - values: Vec, - _phantom: PhantomData, + values: Vec, + _phantom: PhantomData, } -impl Diff +impl Diff where - T: Primitive + AsPrimitive, - SubT: Primitive + std::ops::Sub, + I: WrapperType, + O: WrapperType, { - fn push(&mut self, value: T) { + fn push(&mut self, value: I) { self.values.push(value); } } -impl Accumulator for Diff +impl Accumulator for Diff where - T: Primitive + AsPrimitive, - for<'a> T: Scalar = T>, - SubT: Primitive + std::ops::Sub, - for<'a> SubT: Scalar = SubT>, + I: WrapperType, + O: WrapperType, + I::Native: AsPrimitive, + O::Native: std::ops::Sub, { fn state(&self) -> Result> { let nums = self @@ -65,7 +66,7 @@ where .collect::>(); Ok(vec![Value::List(ListValue::new( Some(Box::new(nums)), - T::default().into().data_type(), + I::LogicalType::build_data_type(), ))]) } @@ -78,12 +79,12 @@ where let column = &values[0]; let mut len = 1; - let column: &::VectorType = if column.is_const() { + let column: &::VectorType = if column.is_const() { len = column.len(); - let column: &ConstantVector = unsafe { VectorHelper::static_cast(column) }; - unsafe { VectorHelper::static_cast(column.inner()) } + let column: &ConstantVector = unsafe { Helper::static_cast(column) }; + unsafe { Helper::static_cast(column.inner()) } } else { - unsafe { VectorHelper::static_cast(column) } + unsafe { Helper::static_cast(column) } }; (0..len).for_each(|_| { for v in column.iter_data().flatten() { @@ -109,8 +110,9 @@ where ), })?; for state in states.values_iter() { - let state = state.context(FromScalarValueSnafu)?; - self.update_batch(&[state])? + if let Some(state) = state.context(FromScalarValueSnafu)? { + self.update_batch(&[state])?; + } } Ok(()) } @@ -122,11 +124,14 @@ where let diff = self .values .windows(2) - .map(|x| (x[1].as_() - x[0].as_()).into()) + .map(|x| { + let native = x[1].into_native().as_() - x[0].into_native().as_(); + O::from_native(native).into() + }) .collect::>(); let diff = Value::List(ListValue::new( Some(Box::new(diff)), - SubT::default().into().data_type(), + O::LogicalType::build_data_type(), )); Ok(diff) } @@ -143,7 +148,7 @@ impl AggregateFunctionCreator for DiffAccumulatorCreator { with_match_primitive_type_id!( input_type.logical_type_id(), |$S| { - Ok(Box::new(Diff::<$S,<$S as Primitive>::LargestType>::default())) + Ok(Box::new(Diff::<<$S as LogicalPrimitiveType>::Wrapper, <<$S as LogicalPrimitiveType>::LargestType as LogicalPrimitiveType>::Wrapper>::default())) }, { let err_msg = format!( @@ -163,7 +168,7 @@ impl AggregateFunctionCreator for DiffAccumulatorCreator { with_match_primitive_type_id!( input_types[0].logical_type_id(), |$S| { - Ok(ConcreteDataType::list_datatype(PrimitiveType::<<$S as Primitive>::LargestType>::default().into())) + Ok(ConcreteDataType::list_datatype($S::default().into())) }, { unreachable!() @@ -177,7 +182,7 @@ impl AggregateFunctionCreator for DiffAccumulatorCreator { with_match_primitive_type_id!( input_types[0].logical_type_id(), |$S| { - Ok(vec![ConcreteDataType::list_datatype(PrimitiveType::<$S>::default().into())]) + Ok(vec![ConcreteDataType::list_datatype($S::default().into())]) }, { unreachable!() @@ -188,9 +193,10 @@ impl AggregateFunctionCreator for DiffAccumulatorCreator { #[cfg(test)] mod test { - use datatypes::vectors::PrimitiveVector; + use datatypes::vectors::Int32Vector; use super::*; + #[test] fn test_update_batch() { // test update empty batch, expect not updating anything @@ -201,21 +207,19 @@ mod test { // test update one not-null value let mut diff = Diff::::default(); - let v: Vec = vec![Arc::new(PrimitiveVector::::from(vec![Some(42)]))]; + let v: Vec = vec![Arc::new(Int32Vector::from(vec![Some(42)]))]; assert!(diff.update_batch(&v).is_ok()); assert_eq!(Value::Null, diff.evaluate().unwrap()); // test update one null value let mut diff = Diff::::default(); - let v: Vec = vec![Arc::new(PrimitiveVector::::from(vec![ - Option::::None, - ]))]; + let v: Vec = vec![Arc::new(Int32Vector::from(vec![Option::::None]))]; assert!(diff.update_batch(&v).is_ok()); assert_eq!(Value::Null, diff.evaluate().unwrap()); // test update no null-value batch let mut diff = Diff::::default(); - let v: Vec = vec![Arc::new(PrimitiveVector::::from(vec![ + let v: Vec = vec![Arc::new(Int32Vector::from(vec![ Some(-1i32), Some(1), Some(2), @@ -232,7 +236,7 @@ mod test { // test update null-value batch let mut diff = Diff::::default(); - let v: Vec = vec![Arc::new(PrimitiveVector::::from(vec![ + let v: Vec = vec![Arc::new(Int32Vector::from(vec![ Some(-2i32), None, Some(3), @@ -251,7 +255,7 @@ mod test { // test update with constant vector let mut diff = Diff::::default(); let v: Vec = vec![Arc::new(ConstantVector::new( - Arc::new(PrimitiveVector::::from_vec(vec![4])), + Arc::new(Int32Vector::from_vec(vec![4])), 4, ))]; let values = vec![Value::from(0_i64), Value::from(0_i64), Value::from(0_i64)]; diff --git a/src/common/function/src/scalars/expression/mod.rs b/src/common/function/src/scalars/expression.rs similarity index 100% rename from src/common/function/src/scalars/expression/mod.rs rename to src/common/function/src/scalars/expression.rs diff --git a/src/common/function/src/scalars/function.rs b/src/common/function/src/scalars/function.rs index 353f524ea9..6f70bca4a0 100644 --- a/src/common/function/src/scalars/function.rs +++ b/src/common/function/src/scalars/function.rs @@ -16,12 +16,11 @@ use std::fmt; use std::sync::Arc; use chrono_tz::Tz; +use common_query::error::Result; use common_query::prelude::Signature; use datatypes::data_type::ConcreteDataType; use datatypes::vectors::VectorRef; -use crate::error::Result; - #[derive(Clone)] pub struct FunctionContext { pub tz: Tz, diff --git a/src/common/function/src/scalars/math/mod.rs b/src/common/function/src/scalars/math.rs similarity index 100% rename from src/common/function/src/scalars/math/mod.rs rename to src/common/function/src/scalars/math.rs diff --git a/src/common/function/src/scalars/numpy/mod.rs b/src/common/function/src/scalars/numpy.rs similarity index 100% rename from src/common/function/src/scalars/numpy/mod.rs rename to src/common/function/src/scalars/numpy.rs diff --git a/src/common/function/src/scalars/timestamp/mod.rs b/src/common/function/src/scalars/timestamp.rs similarity index 100% rename from src/common/function/src/scalars/timestamp/mod.rs rename to src/common/function/src/scalars/timestamp.rs diff --git a/src/common/function/src/scalars/udf.rs b/src/common/function/src/scalars/udf.rs index b2d47af34d..f6a7dcee87 100644 --- a/src/common/function/src/scalars/udf.rs +++ b/src/common/function/src/scalars/udf.rs @@ -19,7 +19,8 @@ use common_query::prelude::{ ColumnarValue, ReturnTypeFunction, ScalarFunctionImplementation, ScalarUdf, ScalarValue, }; use datatypes::error::Error as DataTypeError; -use datatypes::prelude::{ConcreteDataType, VectorHelper}; +use datatypes::prelude::*; +use datatypes::vectors::Helper; use snafu::ResultExt; use crate::scalars::function::{FunctionContext, FunctionRef}; @@ -47,7 +48,7 @@ pub fn create_udf(func: FunctionRef) -> ScalarUdf { let args: Result, DataTypeError> = args .iter() .map(|arg| match arg { - ColumnarValue::Scalar(v) => VectorHelper::try_from_scalar_value(v.clone(), rows), + ColumnarValue::Scalar(v) => Helper::try_from_scalar_value(v.clone(), rows), ColumnarValue::Vector(v) => Ok(v.clone()), }) .collect(); diff --git a/src/common/query/src/error.rs b/src/common/query/src/error.rs index e736214e49..1d1e842d29 100644 --- a/src/common/query/src/error.rs +++ b/src/common/query/src/error.rs @@ -23,17 +23,9 @@ use datatypes::error::Error as DataTypeError; use datatypes::prelude::ConcreteDataType; use statrs::StatsError; -common_error::define_opaque_error!(Error); - #[derive(Debug, Snafu)] #[snafu(visibility(pub))] -pub enum InnerError { - #[snafu(display("Fail to cast array to {:?}, source: {}", typ, source))] - TypeCast { - source: ArrowError, - typ: arrow::datatypes::DataType, - }, - +pub enum Error { #[snafu(display("Fail to execute function, source: {}", source))] ExecuteFunction { source: DataFusionError, @@ -135,43 +127,56 @@ pub enum InnerError { source: BoxedError, }, + #[snafu(display("Fail to cast array to {:?}, source: {}", typ, source))] + TypeCast { + source: ArrowError, + typ: arrow::datatypes::DataType, + }, + #[snafu(display("Query engine fail to cast value: {}", source))] ToScalarValue { #[snafu(backtrace)] source: DataTypeError, }, + + #[snafu(display("Fail to get scalar vector, {}", source))] + GetScalarVector { + #[snafu(backtrace)] + source: DataTypeError, + }, } pub type Result = std::result::Result; -impl ErrorExt for InnerError { +impl ErrorExt for Error { fn status_code(&self) -> StatusCode { match self { - InnerError::ExecuteFunction { .. } - | InnerError::GenerateFunction { .. } - | InnerError::CreateAccumulator { .. } - | InnerError::DowncastVector { .. } - | InnerError::InvalidInputState { .. } - | InnerError::InvalidInputCol { .. } - | InnerError::BadAccumulatorImpl { .. } - | InnerError::ToScalarValue { .. } => StatusCode::EngineExecuteQuery, + Error::ExecuteFunction { .. } + | Error::GenerateFunction { .. } + | Error::CreateAccumulator { .. } + | Error::DowncastVector { .. } + | Error::InvalidInputState { .. } + | Error::InvalidInputCol { .. } + | Error::BadAccumulatorImpl { .. } + | Error::ToScalarValue { .. } + | Error::GetScalarVector { .. } => StatusCode::EngineExecuteQuery, - InnerError::InvalidInputs { source, .. } - | InnerError::IntoVector { source, .. } - | InnerError::FromScalarValue { source } - | InnerError::ConvertArrowSchema { source } - | InnerError::FromArrowArray { source } => source.status_code(), + Error::InvalidInputs { source, .. } + | Error::IntoVector { source, .. } + | Error::FromScalarValue { source } + | Error::ConvertArrowSchema { source } + | Error::FromArrowArray { source } => source.status_code(), - InnerError::ExecuteRepeatedly { .. } - | InnerError::GeneralDataFusion { .. } - | InnerError::DataFusionExecutionPlan { .. } => StatusCode::Unexpected, + Error::ExecuteRepeatedly { .. } + | Error::GeneralDataFusion { .. } + | Error::DataFusionExecutionPlan { .. } => StatusCode::Unexpected, - InnerError::UnsupportedInputDataType { .. } | InnerError::TypeCast { .. } => { + Error::UnsupportedInputDataType { .. } | Error::TypeCast { .. } => { StatusCode::InvalidArguments } - InnerError::ConvertDfRecordBatchStream { source, .. } => source.status_code(), - InnerError::ExecutePhysicalPlan { source } => source.status_code(), + Error::ConvertDfRecordBatchStream { source, .. } => source.status_code(), + Error::ExecutePhysicalPlan { source } => source.status_code(), } } @@ -184,12 +189,6 @@ impl ErrorExt for InnerError { } } -impl From for Error { - fn from(e: InnerError) -> Error { - Error::new(e) - } -} - impl From for DataFusionError { fn from(e: Error) -> DataFusionError { DataFusionError::External(Box::new(e)) @@ -198,7 +197,7 @@ impl From for DataFusionError { impl From for Error { fn from(source: BoxedError) -> Self { - InnerError::ExecutePhysicalPlan { source }.into() + Error::ExecutePhysicalPlan { source }.into() } } @@ -214,7 +213,7 @@ mod tests { } fn assert_error(err: &Error, code: StatusCode) { - let inner_err = err.as_any().downcast_ref::().unwrap(); + let inner_err = err.as_any().downcast_ref::().unwrap(); assert_eq!(code, inner_err.status_code()); assert!(inner_err.backtrace_opt().is_some()); } @@ -250,14 +249,14 @@ mod tests { .err() .unwrap() .into(); - assert_eq!(error.inner.status_code(), StatusCode::Unexpected); + assert_eq!(error.status_code(), StatusCode::Unexpected); assert!(error.backtrace_opt().is_some()); } #[test] fn test_convert_df_recordbatch_stream_error() { let result: std::result::Result = - Err(common_recordbatch::error::InnerError::PollStream { + Err(common_recordbatch::error::Error::PollStream { source: ArrowError::DivideByZero, backtrace: Backtrace::generate(), } @@ -267,7 +266,7 @@ mod tests { .err() .unwrap() .into(); - assert_eq!(error.inner.status_code(), StatusCode::Internal); + assert_eq!(error.status_code(), StatusCode::Internal); assert!(error.backtrace_opt().is_some()); } diff --git a/src/common/query/src/logical_plan/accumulator.rs b/src/common/query/src/logical_plan/accumulator.rs index aa97f22f79..cce139094e 100644 --- a/src/common/query/src/logical_plan/accumulator.rs +++ b/src/common/query/src/logical_plan/accumulator.rs @@ -17,12 +17,10 @@ use std::fmt::Debug; use std::sync::Arc; -use common_time::timestamp::TimeUnit; use datafusion_common::Result as DfResult; use datafusion_expr::{Accumulator as DfAccumulator, AggregateState}; use datatypes::arrow::array::ArrayRef; use datatypes::prelude::*; -use datatypes::value::ListValue; use datatypes::vectors::{Helper as VectorHelper, VectorRef}; use snafu::ResultExt; @@ -135,8 +133,7 @@ impl DfAccumulator for DfAccumulatorAdaptor { return error::BadAccumulatorImplSnafu { err_msg: format!("Accumulator {:?} returned state values size do not match its state types size.", self), } - .fail() - .map_err(Error::from)?; + .fail()?; } Ok(state_values .into_iter() @@ -147,31 +144,26 @@ impl DfAccumulator for DfAccumulatorAdaptor { .context(error::ToScalarValueSnafu)?; Ok(AggregateState::Scalar(scalar)) }) - .collect::>>() - .map_err(Error::from)?) + .collect::>>()?) } fn update_batch(&mut self, values: &[ArrayRef]) -> DfResult<()> { - let vectors = VectorHelper::try_into_vectors(values) - .context(FromScalarValueSnafu) - .map_err(Error::from)?; - self.accumulator - .update_batch(&vectors) - .map_err(|e| e.into()) + let vectors = VectorHelper::try_into_vectors(values).context(FromScalarValueSnafu)?; + self.accumulator.update_batch(&vectors)?; + Ok(()) } fn merge_batch(&mut self, states: &[ArrayRef]) -> DfResult<()> { let mut vectors = Vec::with_capacity(states.len()); for array in states.iter() { vectors.push( - VectorHelper::try_into_vector(array) - .context(IntoVectorSnafu { - data_type: array.data_type().clone(), - }) - .map_err(Error::from)?, + VectorHelper::try_into_vector(array).context(IntoVectorSnafu { + data_type: array.data_type().clone(), + })?, ); } - self.accumulator.merge_batch(&vectors).map_err(|e| e.into()) + self.accumulator.merge_batch(&vectors)?; + Ok(()) } fn evaluate(&self) -> DfResult { diff --git a/src/common/query/src/physical_plan.rs b/src/common/query/src/physical_plan.rs index 8b0f42898c..2284c1b2d7 100644 --- a/src/common/query/src/physical_plan.rs +++ b/src/common/query/src/physical_plan.rs @@ -168,8 +168,7 @@ impl DfPhysicalPlan for DfPhysicalPlanAdapter { let schema: SchemaRef = Arc::new( df_schema .try_into() - .context(error::ConvertArrowSchemaSnafu) - .map_err(error::Error::from)?, + .context(error::ConvertArrowSchemaSnafu)?, ); let children = children .into_iter() diff --git a/src/common/recordbatch/src/error.rs b/src/common/recordbatch/src/error.rs index 894c31f4a2..0937441338 100644 --- a/src/common/recordbatch/src/error.rs +++ b/src/common/recordbatch/src/error.rs @@ -17,13 +17,12 @@ use std::any::Any; use common_error::ext::BoxedError; use common_error::prelude::*; -common_error::define_opaque_error!(Error); pub type Result = std::result::Result; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] -pub enum InnerError { +pub enum Error { #[snafu(display("Fail to create datafusion record batch, source: {}", source))] NewDfRecordBatch { source: datatypes::arrow::error::ArrowError, @@ -67,19 +66,19 @@ pub enum InnerError { }, } -impl ErrorExt for InnerError { +impl ErrorExt for Error { fn status_code(&self) -> StatusCode { match self { - InnerError::NewDfRecordBatch { .. } => StatusCode::InvalidArguments, + Error::NewDfRecordBatch { .. } => StatusCode::InvalidArguments, - InnerError::DataTypes { .. } - | InnerError::CreateRecordBatches { .. } - | InnerError::PollStream { .. } - | InnerError::Format { .. } => StatusCode::Internal, + Error::DataTypes { .. } + | Error::CreateRecordBatches { .. } + | Error::PollStream { .. } + | Error::Format { .. } => StatusCode::Internal, - InnerError::External { source } => source.status_code(), + Error::External { source } => source.status_code(), - InnerError::SchemaConversion { source, .. } => source.status_code(), + Error::SchemaConversion { source, .. } => source.status_code(), } } @@ -91,9 +90,3 @@ impl ErrorExt for InnerError { self } } - -impl From for Error { - fn from(e: InnerError) -> Error { - Error::new(e) - } -} diff --git a/src/common/recordbatch/src/util.rs b/src/common/recordbatch/src/util.rs index ac781c39f9..1cca3ee988 100644 --- a/src/common/recordbatch/src/util.rs +++ b/src/common/recordbatch/src/util.rs @@ -35,7 +35,7 @@ mod tests { use futures::Stream; use super::*; - use crate::{RecordBatchStream}; + use crate::RecordBatchStream; struct MockRecordBatchStream { batch: Option, diff --git a/src/datatypes/src/types/date_type.rs b/src/datatypes/src/types/date_type.rs index 052b837a3d..afd482359d 100644 --- a/src/datatypes/src/types/date_type.rs +++ b/src/datatypes/src/types/date_type.rs @@ -59,6 +59,7 @@ impl LogicalPrimitiveType for DateType { type ArrowPrimitive = Date32Type; type Native = i32; type Wrapper = Date; + type LargestType = Self; fn build_data_type() -> ConcreteDataType { ConcreteDataType::date_datatype() diff --git a/src/datatypes/src/types/datetime_type.rs b/src/datatypes/src/types/datetime_type.rs index d74a02effe..ccd810eee7 100644 --- a/src/datatypes/src/types/datetime_type.rs +++ b/src/datatypes/src/types/datetime_type.rs @@ -57,6 +57,7 @@ impl LogicalPrimitiveType for DateTimeType { type ArrowPrimitive = Date64Type; type Native = i64; type Wrapper = DateTime; + type LargestType = Self; fn build_data_type() -> ConcreteDataType { ConcreteDataType::datetime_datatype() diff --git a/src/datatypes/src/types/primitive_type.rs b/src/datatypes/src/types/primitive_type.rs index e389ca13bf..c005b89fee 100644 --- a/src/datatypes/src/types/primitive_type.rs +++ b/src/datatypes/src/types/primitive_type.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::cmp::Ordering; +use std::fmt; use arrow::datatypes::{ArrowNativeType, ArrowPrimitiveType, DataType as ArrowDataType}; use common_time::{Date, DateTime}; @@ -29,36 +30,34 @@ use crate::value::{Value, ValueRef}; use crate::vectors::{MutableVector, PrimitiveVector, PrimitiveVectorBuilder, Vector}; /// Data types that can be used as arrow's native type. -pub trait NativeType: ArrowNativeType + NumCast { - /// Largest numeric type this primitive type can be cast to. - type LargestType: NativeType; -} +pub trait NativeType: ArrowNativeType + NumCast {} macro_rules! impl_native_type { - ($Type: ident, $LargestType: ident) => { - impl NativeType for $Type { - type LargestType = $LargestType; - } + ($Type: ident) => { + impl NativeType for $Type {} }; } -impl_native_type!(u8, u64); -impl_native_type!(u16, u64); -impl_native_type!(u32, u64); -impl_native_type!(u64, u64); -impl_native_type!(i8, i64); -impl_native_type!(i16, i64); -impl_native_type!(i32, i64); -impl_native_type!(i64, i64); -impl_native_type!(f32, f64); -impl_native_type!(f64, f64); +impl_native_type!(u8); +impl_native_type!(u16); +impl_native_type!(u32); +impl_native_type!(u64); +impl_native_type!(i8); +impl_native_type!(i16); +impl_native_type!(i32); +impl_native_type!(i64); +impl_native_type!(f32); +impl_native_type!(f64); /// Represents the wrapper type that wraps a native type using the `newtype pattern`, /// such as [Date](`common_time::Date`) is a wrapper type for the underlying native /// type `i32`. pub trait WrapperType: Copy - + Scalar + + Send + + Sync + + fmt::Debug + + for<'a> Scalar = Self> + PartialEq + Into + Into> @@ -87,6 +86,8 @@ pub trait LogicalPrimitiveType: 'static + Sized { type Wrapper: WrapperType + for<'a> Scalar, RefType<'a> = Self::Wrapper> + for<'a> ScalarRef<'a, ScalarType = Self::Wrapper>; + /// Largest type this primitive type can cast to. + type LargestType: LogicalPrimitiveType; /// Construct the data type struct. fn build_data_type() -> ConcreteDataType; @@ -188,7 +189,7 @@ impl WrapperType for DateTime { } macro_rules! define_logical_primitive_type { - ($Native: ident, $TypeId: ident, $DataType: ident) => { + ($Native: ident, $TypeId: ident, $DataType: ident, $Largest: ident) => { // We need to define it as an empty struct `struct DataType {}` instead of a struct-unit // `struct DataType;` to ensure the serialized JSON string is compatible with previous // implementation. @@ -199,6 +200,7 @@ macro_rules! define_logical_primitive_type { type ArrowPrimitive = arrow::datatypes::$DataType; type Native = $Native; type Wrapper = $Native; + type LargestType = $Largest; fn build_data_type() -> ConcreteDataType { ConcreteDataType::$TypeId($DataType::default()) @@ -240,8 +242,8 @@ macro_rules! define_logical_primitive_type { } macro_rules! define_non_timestamp_primitive { - ($Native: ident, $TypeId: ident, $DataType: ident) => { - define_logical_primitive_type!($Native, $TypeId, $DataType); + ($Native: ident, $TypeId: ident, $DataType: ident, $Largest: ident) => { + define_logical_primitive_type!($Native, $TypeId, $DataType, $Largest); impl DataType for $DataType { fn name(&self) -> &str { @@ -271,18 +273,18 @@ macro_rules! define_non_timestamp_primitive { }; } -define_non_timestamp_primitive!(u8, UInt8, UInt8Type); -define_non_timestamp_primitive!(u16, UInt16, UInt16Type); -define_non_timestamp_primitive!(u32, UInt32, UInt32Type); -define_non_timestamp_primitive!(u64, UInt64, UInt64Type); -define_non_timestamp_primitive!(i8, Int8, Int8Type); -define_non_timestamp_primitive!(i16, Int16, Int16Type); -define_non_timestamp_primitive!(i32, Int32, Int32Type); -define_non_timestamp_primitive!(f32, Float32, Float32Type); -define_non_timestamp_primitive!(f64, Float64, Float64Type); +define_non_timestamp_primitive!(u8, UInt8, UInt8Type, UInt64Type); +define_non_timestamp_primitive!(u16, UInt16, UInt16Type, UInt64Type); +define_non_timestamp_primitive!(u32, UInt32, UInt32Type, UInt64Type); +define_non_timestamp_primitive!(u64, UInt64, UInt64Type, UInt64Type); +define_non_timestamp_primitive!(i8, Int8, Int8Type, Int64Type); +define_non_timestamp_primitive!(i16, Int16, Int16Type, Int64Type); +define_non_timestamp_primitive!(i32, Int32, Int32Type, Int64Type); +define_non_timestamp_primitive!(f32, Float32, Float32Type, Float64Type); +define_non_timestamp_primitive!(f64, Float64, Float64Type, Float64Type); // Timestamp primitive: -define_logical_primitive_type!(i64, Int64, Int64Type); +define_logical_primitive_type!(i64, Int64, Int64Type, Int64Type); impl DataType for Int64Type { fn name(&self) -> &str { diff --git a/src/datatypes/src/types/timestamp_type.rs b/src/datatypes/src/types/timestamp_type.rs index 8f38a57b55..ba352bac1e 100644 --- a/src/datatypes/src/types/timestamp_type.rs +++ b/src/datatypes/src/types/timestamp_type.rs @@ -98,6 +98,7 @@ macro_rules! impl_data_type_for_timestamp { type ArrowPrimitive = []; type Native = i64; type Wrapper = []; + type LargestType = Self; fn build_data_type() -> ConcreteDataType { ConcreteDataType::Timestamp(TimestampType::$unit(