From 653906d4fa57c540af9494e451eaf05c2bd77e29 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Tue, 6 Dec 2022 16:45:54 +0800 Subject: [PATCH] fix: Fix common::query compiler errors (#713) * feat: Move conversion to ScalarValue to value.rs * fix: Fix common::query compiler errors This commit also make InnerError pub(crate) --- src/common/query/src/error.rs | 13 +- .../query/src/logical_plan/accumulator.rs | 316 +---------------- src/common/query/src/physical_plan.rs | 1 + src/datatypes/src/data_type.rs | 18 + src/datatypes/src/error.rs | 6 + src/datatypes/src/types/list_type.rs | 7 + src/datatypes/src/value.rs | 317 +++++++++++++++++- 7 files changed, 366 insertions(+), 312 deletions(-) diff --git a/src/common/query/src/error.rs b/src/common/query/src/error.rs index d26fcf3278..31b42f6ebf 100644 --- a/src/common/query/src/error.rs +++ b/src/common/query/src/error.rs @@ -24,8 +24,8 @@ use statrs::StatsError; common_error::define_opaque_error!(Error); #[derive(Debug, Snafu)] -#[snafu(visibility(pub))] -pub enum InnerError { +#[snafu(visibility(pub(crate)))] +pub(crate) enum InnerError { #[snafu(display("Fail to execute function, source: {}", source))] ExecuteFunction { source: DataFusionError, @@ -120,6 +120,12 @@ pub enum InnerError { #[snafu(backtrace)] source: BoxedError, }, + + #[snafu(display("Query engine fail to cast value: {}", source))] + ToScalarValue { + #[snafu(backtrace)] + source: DataTypeError, + }, } pub type Result = std::result::Result; @@ -133,7 +139,8 @@ impl ErrorExt for InnerError { | InnerError::DowncastVector { .. } | InnerError::InvalidInputState { .. } | InnerError::InvalidInputCol { .. } - | InnerError::BadAccumulatorImpl { .. } => StatusCode::EngineExecuteQuery, + | InnerError::BadAccumulatorImpl { .. } + | InnerError::ToScalarValue { .. } => StatusCode::EngineExecuteQuery, InnerError::InvalidInputs { source, .. } | InnerError::IntoVector { source, .. } diff --git a/src/common/query/src/logical_plan/accumulator.rs b/src/common/query/src/logical_plan/accumulator.rs index 483fb74ac8..aa97f22f79 100644 --- a/src/common/query/src/logical_plan/accumulator.rs +++ b/src/common/query/src/logical_plan/accumulator.rs @@ -142,7 +142,9 @@ impl DfAccumulator for DfAccumulatorAdaptor { .into_iter() .zip(state_types.iter()) .map(|(v, t)| { - let scalar = try_into_scalar_value(v, t)?; + let scalar = v + .try_to_scalar_value(t) + .context(error::ToScalarValueSnafu)?; Ok(AggregateState::Scalar(scalar)) }) .collect::>>() @@ -175,312 +177,10 @@ impl DfAccumulator for DfAccumulatorAdaptor { fn evaluate(&self) -> DfResult { let value = self.accumulator.evaluate()?; let output_type = self.creator.output_type()?; - Ok(try_into_scalar_value(value, &output_type)?) - } -} - -fn try_into_scalar_value(value: Value, datatype: &ConcreteDataType) -> Result { - if !matches!(value, Value::Null) && datatype != &value.data_type() { - return error::BadAccumulatorImplSnafu { - err_msg: format!( - "expect value to return datatype {:?}, actual: {:?}", - datatype, - value.data_type() - ), - } - .fail()?; - } - - Ok(match value { - Value::Boolean(v) => ScalarValue::Boolean(Some(v)), - Value::UInt8(v) => ScalarValue::UInt8(Some(v)), - Value::UInt16(v) => ScalarValue::UInt16(Some(v)), - Value::UInt32(v) => ScalarValue::UInt32(Some(v)), - Value::UInt64(v) => ScalarValue::UInt64(Some(v)), - Value::Int8(v) => ScalarValue::Int8(Some(v)), - Value::Int16(v) => ScalarValue::Int16(Some(v)), - Value::Int32(v) => ScalarValue::Int32(Some(v)), - Value::Int64(v) => ScalarValue::Int64(Some(v)), - Value::Float32(v) => ScalarValue::Float32(Some(v.0)), - Value::Float64(v) => ScalarValue::Float64(Some(v.0)), - Value::String(v) => ScalarValue::Utf8(Some(v.as_utf8().to_string())), - Value::Binary(v) => ScalarValue::LargeBinary(Some(v.to_vec())), - Value::Date(v) => ScalarValue::Date32(Some(v.val())), - Value::DateTime(v) => ScalarValue::Date64(Some(v.val())), - Value::Null => try_convert_null_value(datatype)?, - Value::List(list) => try_convert_list_value(list)?, - Value::Timestamp(t) => timestamp_to_scalar_value(t.unit(), Some(t.value())), - }) -} - -fn timestamp_to_scalar_value(unit: TimeUnit, val: Option) -> ScalarValue { - match unit { - TimeUnit::Second => ScalarValue::TimestampSecond(val, None), - TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(val, None), - TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(val, None), - TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(val, None), - } -} - -fn try_convert_null_value(datatype: &ConcreteDataType) -> Result { - Ok(match datatype { - ConcreteDataType::Boolean(_) => ScalarValue::Boolean(None), - ConcreteDataType::Int8(_) => ScalarValue::Int8(None), - ConcreteDataType::Int16(_) => ScalarValue::Int16(None), - ConcreteDataType::Int32(_) => ScalarValue::Int32(None), - ConcreteDataType::Int64(_) => ScalarValue::Int64(None), - ConcreteDataType::UInt8(_) => ScalarValue::UInt8(None), - ConcreteDataType::UInt16(_) => ScalarValue::UInt16(None), - ConcreteDataType::UInt32(_) => ScalarValue::UInt32(None), - ConcreteDataType::UInt64(_) => ScalarValue::UInt64(None), - ConcreteDataType::Float32(_) => ScalarValue::Float32(None), - ConcreteDataType::Float64(_) => ScalarValue::Float64(None), - ConcreteDataType::Binary(_) => ScalarValue::LargeBinary(None), - ConcreteDataType::String(_) => ScalarValue::Utf8(None), - ConcreteDataType::Timestamp(t) => timestamp_to_scalar_value(t.unit(), None), - _ => { - return error::BadAccumulatorImplSnafu { - err_msg: format!( - "undefined transition from null value to datatype {:?}", - datatype - ), - } - .fail()? - } - }) -} - -fn try_convert_list_value(list: ListValue) -> Result { - let vs = if let Some(items) = list.items() { - Some(Box::new( - items - .iter() - .map(|v| try_into_scalar_value(v.clone(), list.datatype())) - .collect::>>()?, - )) - } else { - None - }; - Ok(ScalarValue::List( - vs, - Box::new(list.datatype().as_arrow_type()), - )) -} - -#[cfg(test)] -mod tests { - use common_base::bytes::{Bytes, StringBytes}; - use datafusion_common::ScalarValue; - use datatypes::arrow::datatypes::DataType; - use datatypes::value::{ListValue, OrderedFloat}; - - use super::*; - - #[test] - fn test_not_null_value_to_scalar_value() { - assert_eq!( - ScalarValue::Boolean(Some(true)), - try_into_scalar_value(Value::Boolean(true), &ConcreteDataType::boolean_datatype()) - .unwrap() - ); - assert_eq!( - ScalarValue::Boolean(Some(false)), - try_into_scalar_value(Value::Boolean(false), &ConcreteDataType::boolean_datatype()) - .unwrap() - ); - assert_eq!( - ScalarValue::UInt8(Some(u8::MIN + 1)), - try_into_scalar_value( - Value::UInt8(u8::MIN + 1), - &ConcreteDataType::uint8_datatype() - ) - .unwrap() - ); - assert_eq!( - ScalarValue::UInt16(Some(u16::MIN + 2)), - try_into_scalar_value( - Value::UInt16(u16::MIN + 2), - &ConcreteDataType::uint16_datatype() - ) - .unwrap() - ); - assert_eq!( - ScalarValue::UInt32(Some(u32::MIN + 3)), - try_into_scalar_value( - Value::UInt32(u32::MIN + 3), - &ConcreteDataType::uint32_datatype() - ) - .unwrap() - ); - assert_eq!( - ScalarValue::UInt64(Some(u64::MIN + 4)), - try_into_scalar_value( - Value::UInt64(u64::MIN + 4), - &ConcreteDataType::uint64_datatype() - ) - .unwrap() - ); - assert_eq!( - ScalarValue::Int8(Some(i8::MIN + 4)), - try_into_scalar_value(Value::Int8(i8::MIN + 4), &ConcreteDataType::int8_datatype()) - .unwrap() - ); - assert_eq!( - ScalarValue::Int16(Some(i16::MIN + 5)), - try_into_scalar_value( - Value::Int16(i16::MIN + 5), - &ConcreteDataType::int16_datatype() - ) - .unwrap() - ); - assert_eq!( - ScalarValue::Int32(Some(i32::MIN + 6)), - try_into_scalar_value( - Value::Int32(i32::MIN + 6), - &ConcreteDataType::int32_datatype() - ) - .unwrap() - ); - assert_eq!( - ScalarValue::Int64(Some(i64::MIN + 7)), - try_into_scalar_value( - Value::Int64(i64::MIN + 7), - &ConcreteDataType::int64_datatype() - ) - .unwrap() - ); - assert_eq!( - ScalarValue::Float32(Some(8.0f32)), - try_into_scalar_value( - Value::Float32(OrderedFloat(8.0f32)), - &ConcreteDataType::float32_datatype() - ) - .unwrap() - ); - assert_eq!( - ScalarValue::Float64(Some(9.0f64)), - try_into_scalar_value( - Value::Float64(OrderedFloat(9.0f64)), - &ConcreteDataType::float64_datatype() - ) - .unwrap() - ); - assert_eq!( - ScalarValue::Utf8(Some("hello".to_string())), - try_into_scalar_value( - Value::String(StringBytes::from("hello")), - &ConcreteDataType::string_datatype(), - ) - .unwrap() - ); - assert_eq!( - ScalarValue::LargeBinary(Some("world".as_bytes().to_vec())), - try_into_scalar_value( - Value::Binary(Bytes::from("world".as_bytes())), - &ConcreteDataType::binary_datatype() - ) - .unwrap() - ); - } - - #[test] - fn test_null_value_to_scalar_value() { - assert_eq!( - ScalarValue::Boolean(None), - try_into_scalar_value(Value::Null, &ConcreteDataType::boolean_datatype()).unwrap() - ); - assert_eq!( - ScalarValue::UInt8(None), - try_into_scalar_value(Value::Null, &ConcreteDataType::uint8_datatype()).unwrap() - ); - assert_eq!( - ScalarValue::UInt16(None), - try_into_scalar_value(Value::Null, &ConcreteDataType::uint16_datatype()).unwrap() - ); - assert_eq!( - ScalarValue::UInt32(None), - try_into_scalar_value(Value::Null, &ConcreteDataType::uint32_datatype()).unwrap() - ); - assert_eq!( - ScalarValue::UInt64(None), - try_into_scalar_value(Value::Null, &ConcreteDataType::uint64_datatype()).unwrap() - ); - assert_eq!( - ScalarValue::Int8(None), - try_into_scalar_value(Value::Null, &ConcreteDataType::int8_datatype()).unwrap() - ); - assert_eq!( - ScalarValue::Int16(None), - try_into_scalar_value(Value::Null, &ConcreteDataType::int16_datatype()).unwrap() - ); - assert_eq!( - ScalarValue::Int32(None), - try_into_scalar_value(Value::Null, &ConcreteDataType::int32_datatype()).unwrap() - ); - assert_eq!( - ScalarValue::Int64(None), - try_into_scalar_value(Value::Null, &ConcreteDataType::int64_datatype()).unwrap() - ); - assert_eq!( - ScalarValue::Float32(None), - try_into_scalar_value(Value::Null, &ConcreteDataType::float32_datatype()).unwrap() - ); - assert_eq!( - ScalarValue::Float64(None), - try_into_scalar_value(Value::Null, &ConcreteDataType::float64_datatype()).unwrap() - ); - assert_eq!( - ScalarValue::Utf8(None), - try_into_scalar_value(Value::Null, &ConcreteDataType::string_datatype()).unwrap() - ); - assert_eq!( - ScalarValue::LargeBinary(None), - try_into_scalar_value(Value::Null, &ConcreteDataType::binary_datatype()).unwrap() - ); - } - - #[test] - fn test_list_value_to_scalar_value() { - let items = Some(Box::new(vec![Value::Int32(-1), Value::Null])); - let list = Value::List(ListValue::new(items, ConcreteDataType::int32_datatype())); - let df_list = try_into_scalar_value( - list, - &ConcreteDataType::list_datatype(ConcreteDataType::int32_datatype()), - ) - .unwrap(); - assert!(matches!(df_list, ScalarValue::List(_, _))); - match df_list { - ScalarValue::List(vs, datatype) => { - assert_eq!(*datatype, DataType::Int32); - - assert!(vs.is_some()); - let vs = *vs.unwrap(); - assert_eq!( - vs, - vec![ScalarValue::Int32(Some(-1)), ScalarValue::Int32(None)] - ); - } - _ => unreachable!(), - } - } - - #[test] - pub fn test_timestamp_to_scalar_value() { - assert_eq!( - ScalarValue::TimestampSecond(Some(1), None), - timestamp_to_scalar_value(TimeUnit::Second, Some(1)) - ); - assert_eq!( - ScalarValue::TimestampMillisecond(Some(1), None), - timestamp_to_scalar_value(TimeUnit::Millisecond, Some(1)) - ); - assert_eq!( - ScalarValue::TimestampMicrosecond(Some(1), None), - timestamp_to_scalar_value(TimeUnit::Microsecond, Some(1)) - ); - assert_eq!( - ScalarValue::TimestampNanosecond(Some(1), None), - timestamp_to_scalar_value(TimeUnit::Nanosecond, Some(1)) - ); + let scalar_value = value + .try_to_scalar_value(&output_type) + .context(error::ToScalarValueSnafu) + .map_err(Error::from)?; + Ok(scalar_value) } } diff --git a/src/common/query/src/physical_plan.rs b/src/common/query/src/physical_plan.rs index 9dae297b59..8b0f42898c 100644 --- a/src/common/query/src/physical_plan.rs +++ b/src/common/query/src/physical_plan.rs @@ -110,6 +110,7 @@ impl PhysicalPlan for PhysicalPlanAdapter { .collect(); let plan = self .df_plan + .clone() .with_new_children(children) .context(error::GeneralDataFusionSnafu)?; Ok(Arc::new(PhysicalPlanAdapter::new(self.schema(), plan))) diff --git a/src/datatypes/src/data_type.rs b/src/datatypes/src/data_type.rs index 0d06d566b6..9e4641defa 100644 --- a/src/datatypes/src/data_type.rs +++ b/src/datatypes/src/data_type.rs @@ -134,6 +134,14 @@ impl ConcreteDataType { pub fn is_null(&self) -> bool { matches!(self, ConcreteDataType::Null(NullType)) } + + /// Try to cast the type as a [`ListType`]. + pub fn as_list(&self) -> Option<&ListType> { + match self { + ConcreteDataType::List(t) => Some(t), + _ => None, + } + } } impl TryFrom<&ArrowDataType> for ConcreteDataType { @@ -483,4 +491,14 @@ mod tests { let nums = ConcreteDataType::numerics(); assert_eq!(10, nums.len()); } + + #[test] + fn test_as_list() { + let list_type = ConcreteDataType::list_datatype(ConcreteDataType::int32_datatype()); + assert_eq!( + ListType::new(ConcreteDataType::int32_datatype()), + *list_type.as_list().unwrap() + ); + assert!(ConcreteDataType::int32_datatype().as_list().is_none()); + } } diff --git a/src/datatypes/src/error.rs b/src/datatypes/src/error.rs index 50b49cf2b4..2cb8553a90 100644 --- a/src/datatypes/src/error.rs +++ b/src/datatypes/src/error.rs @@ -99,6 +99,12 @@ pub enum Error { #[snafu(display("Duplicated metadata for {}", key))] DuplicateMeta { key: String, backtrace: Backtrace }, + + #[snafu(display("Failed to convert value into scalar value, reason: {}", reason))] + ToScalarValue { + reason: String, + backtrace: Backtrace, + }, } impl ErrorExt for Error { diff --git a/src/datatypes/src/types/list_type.rs b/src/datatypes/src/types/list_type.rs index b9875ca362..3c8535810d 100644 --- a/src/datatypes/src/types/list_type.rs +++ b/src/datatypes/src/types/list_type.rs @@ -41,6 +41,12 @@ impl ListType { item_type: Box::new(item_type), } } + + /// Returns the item data type. + #[inline] + pub fn item_type(&self) -> &ConcreteDataType { + &self.item_type + } } impl DataType for ListType { @@ -91,5 +97,6 @@ mod tests { ArrowDataType::List(Box::new(Field::new("item", ArrowDataType::Boolean, true))), t.as_arrow_type() ); + assert_eq!(ConcreteDataType::boolean_datatype(), *t.item_type()); } } diff --git a/src/datatypes/src/value.rs b/src/datatypes/src/value.rs index bade88d419..911f8600de 100644 --- a/src/datatypes/src/value.rs +++ b/src/datatypes/src/value.rs @@ -15,6 +15,7 @@ use std::cmp::Ordering; use std::fmt::{Display, Formatter}; +use arrow::datatypes::{DataType as ArrowDataType, Field}; use common_base::bytes::{Bytes, StringBytes}; use common_time::date::Date; use common_time::datetime::DateTime; @@ -22,10 +23,12 @@ use common_time::timestamp::{TimeUnit, Timestamp}; use datafusion_common::ScalarValue; pub use ordered_float::OrderedFloat; use serde::{Deserialize, Serialize}; +use snafu::ensure; use crate::error::{self, Result}; use crate::prelude::*; use crate::type_id::LogicalTypeId; +use crate::types::ListType; use crate::vectors::ListVector; pub type OrderedF32 = OrderedFloat; @@ -110,7 +113,6 @@ impl Value { /// # Panics /// Panics if the data type is not supported. pub fn data_type(&self) -> ConcreteDataType { - // TODO(yingwen): Implement this once all data types are implemented. match self { Value::Null => ConcreteDataType::null_datatype(), Value::Boolean(_) => ConcreteDataType::boolean_datatype(), @@ -202,6 +204,87 @@ impl Value { }, } } + + /// Convert the value into [`ScalarValue`] according to the `output_type`. + pub fn try_to_scalar_value(&self, output_type: &ConcreteDataType) -> Result { + // Compare logical type, since value might not contains full type information. + let value_type_id = self.logical_type_id(); + let output_type_id = output_type.logical_type_id(); + ensure!( + output_type_id == value_type_id || self.is_null(), + error::ToScalarValueSnafu { + reason: format!( + "expect value to return output_type {:?}, actual: {:?}", + output_type_id, value_type_id, + ), + } + ); + + let scalar_value = match self { + Value::Boolean(v) => ScalarValue::Boolean(Some(*v)), + Value::UInt8(v) => ScalarValue::UInt8(Some(*v)), + Value::UInt16(v) => ScalarValue::UInt16(Some(*v)), + Value::UInt32(v) => ScalarValue::UInt32(Some(*v)), + Value::UInt64(v) => ScalarValue::UInt64(Some(*v)), + Value::Int8(v) => ScalarValue::Int8(Some(*v)), + Value::Int16(v) => ScalarValue::Int16(Some(*v)), + Value::Int32(v) => ScalarValue::Int32(Some(*v)), + Value::Int64(v) => ScalarValue::Int64(Some(*v)), + Value::Float32(v) => ScalarValue::Float32(Some(v.0)), + Value::Float64(v) => ScalarValue::Float64(Some(v.0)), + Value::String(v) => ScalarValue::Utf8(Some(v.as_utf8().to_string())), + Value::Binary(v) => ScalarValue::LargeBinary(Some(v.to_vec())), + Value::Date(v) => ScalarValue::Date32(Some(v.val())), + Value::DateTime(v) => ScalarValue::Date64(Some(v.val())), + Value::Null => to_null_value(output_type), + Value::List(list) => { + // Safety: The logical type of the value and output_type are the same. + let list_type = output_type.as_list().unwrap(); + list.try_to_scalar_value(list_type)? + } + Value::Timestamp(t) => timestamp_to_scalar_value(t.unit(), Some(t.value())), + }; + + Ok(scalar_value) + } +} + +fn to_null_value(output_type: &ConcreteDataType) -> ScalarValue { + match output_type { + ConcreteDataType::Null(_) => ScalarValue::Null, + ConcreteDataType::Boolean(_) => ScalarValue::Boolean(None), + ConcreteDataType::Int8(_) => ScalarValue::Int8(None), + ConcreteDataType::Int16(_) => ScalarValue::Int16(None), + ConcreteDataType::Int32(_) => ScalarValue::Int32(None), + ConcreteDataType::Int64(_) => ScalarValue::Int64(None), + ConcreteDataType::UInt8(_) => ScalarValue::UInt8(None), + ConcreteDataType::UInt16(_) => ScalarValue::UInt16(None), + ConcreteDataType::UInt32(_) => ScalarValue::UInt32(None), + ConcreteDataType::UInt64(_) => ScalarValue::UInt64(None), + ConcreteDataType::Float32(_) => ScalarValue::Float32(None), + ConcreteDataType::Float64(_) => ScalarValue::Float64(None), + ConcreteDataType::Binary(_) => ScalarValue::LargeBinary(None), + ConcreteDataType::String(_) => ScalarValue::Utf8(None), + ConcreteDataType::Date(_) => ScalarValue::Date32(None), + ConcreteDataType::DateTime(_) => ScalarValue::Date64(None), + ConcreteDataType::Timestamp(t) => timestamp_to_scalar_value(t.unit(), None), + ConcreteDataType::List(_) => { + ScalarValue::List(None, Box::new(new_item_field(output_type.as_arrow_type()))) + } + } +} + +fn new_item_field(data_type: ArrowDataType) -> Field { + Field::new("item", data_type, false) +} + +fn timestamp_to_scalar_value(unit: TimeUnit, val: Option) -> ScalarValue { + match unit { + TimeUnit::Second => ScalarValue::TimestampSecond(val, None), + TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(val, None), + TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(val, None), + TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(val, None), + } } macro_rules! impl_ord_for_value_like { @@ -366,6 +449,24 @@ impl ListValue { pub fn datatype(&self) -> &ConcreteDataType { &self.datatype } + + fn try_to_scalar_value(&self, output_type: &ListType) -> Result { + let vs = if let Some(items) = self.items() { + Some( + items + .iter() + .map(|v| v.try_to_scalar_value(output_type.item_type())) + .collect::>>()?, + ) + } else { + None + }; + + Ok(ScalarValue::List( + vs, + Box::new(new_item_field(output_type.as_arrow_type())), + )) + } } impl Default for ListValue { @@ -1272,4 +1373,218 @@ mod tests { "TimestampNanosecondType[]" ); } + + #[test] + fn test_not_null_value_to_scalar_value() { + assert_eq!( + ScalarValue::Boolean(Some(true)), + Value::Boolean(true) + .try_to_scalar_value(&ConcreteDataType::boolean_datatype()) + .unwrap() + ); + assert_eq!( + ScalarValue::Boolean(Some(false)), + Value::Boolean(false) + .try_to_scalar_value(&ConcreteDataType::boolean_datatype()) + .unwrap() + ); + assert_eq!( + ScalarValue::UInt8(Some(u8::MIN + 1)), + Value::UInt8(u8::MIN + 1) + .try_to_scalar_value(&ConcreteDataType::uint8_datatype()) + .unwrap() + ); + assert_eq!( + ScalarValue::UInt16(Some(u16::MIN + 2)), + Value::UInt16(u16::MIN + 2) + .try_to_scalar_value(&ConcreteDataType::uint16_datatype()) + .unwrap() + ); + assert_eq!( + ScalarValue::UInt32(Some(u32::MIN + 3)), + Value::UInt32(u32::MIN + 3) + .try_to_scalar_value(&ConcreteDataType::uint32_datatype()) + .unwrap() + ); + assert_eq!( + ScalarValue::UInt64(Some(u64::MIN + 4)), + Value::UInt64(u64::MIN + 4) + .try_to_scalar_value(&ConcreteDataType::uint64_datatype()) + .unwrap() + ); + assert_eq!( + ScalarValue::Int8(Some(i8::MIN + 4)), + Value::Int8(i8::MIN + 4) + .try_to_scalar_value(&ConcreteDataType::int8_datatype()) + .unwrap() + ); + assert_eq!( + ScalarValue::Int16(Some(i16::MIN + 5)), + Value::Int16(i16::MIN + 5) + .try_to_scalar_value(&ConcreteDataType::int16_datatype()) + .unwrap() + ); + assert_eq!( + ScalarValue::Int32(Some(i32::MIN + 6)), + Value::Int32(i32::MIN + 6) + .try_to_scalar_value(&ConcreteDataType::int32_datatype()) + .unwrap() + ); + assert_eq!( + ScalarValue::Int64(Some(i64::MIN + 7)), + Value::Int64(i64::MIN + 7) + .try_to_scalar_value(&ConcreteDataType::int64_datatype()) + .unwrap() + ); + assert_eq!( + ScalarValue::Float32(Some(8.0f32)), + Value::Float32(OrderedFloat(8.0f32)) + .try_to_scalar_value(&ConcreteDataType::float32_datatype()) + .unwrap() + ); + assert_eq!( + ScalarValue::Float64(Some(9.0f64)), + Value::Float64(OrderedFloat(9.0f64)) + .try_to_scalar_value(&ConcreteDataType::float64_datatype()) + .unwrap() + ); + assert_eq!( + ScalarValue::Utf8(Some("hello".to_string())), + Value::String(StringBytes::from("hello")) + .try_to_scalar_value(&ConcreteDataType::string_datatype(),) + .unwrap() + ); + assert_eq!( + ScalarValue::LargeBinary(Some("world".as_bytes().to_vec())), + Value::Binary(Bytes::from("world".as_bytes())) + .try_to_scalar_value(&ConcreteDataType::binary_datatype()) + .unwrap() + ); + } + + #[test] + fn test_null_value_to_scalar_value() { + assert_eq!( + ScalarValue::Boolean(None), + Value::Null + .try_to_scalar_value(&ConcreteDataType::boolean_datatype()) + .unwrap() + ); + assert_eq!( + ScalarValue::UInt8(None), + Value::Null + .try_to_scalar_value(&ConcreteDataType::uint8_datatype()) + .unwrap() + ); + assert_eq!( + ScalarValue::UInt16(None), + Value::Null + .try_to_scalar_value(&ConcreteDataType::uint16_datatype()) + .unwrap() + ); + assert_eq!( + ScalarValue::UInt32(None), + Value::Null + .try_to_scalar_value(&ConcreteDataType::uint32_datatype()) + .unwrap() + ); + assert_eq!( + ScalarValue::UInt64(None), + Value::Null + .try_to_scalar_value(&ConcreteDataType::uint64_datatype()) + .unwrap() + ); + assert_eq!( + ScalarValue::Int8(None), + Value::Null + .try_to_scalar_value(&ConcreteDataType::int8_datatype()) + .unwrap() + ); + assert_eq!( + ScalarValue::Int16(None), + Value::Null + .try_to_scalar_value(&ConcreteDataType::int16_datatype()) + .unwrap() + ); + assert_eq!( + ScalarValue::Int32(None), + Value::Null + .try_to_scalar_value(&ConcreteDataType::int32_datatype()) + .unwrap() + ); + assert_eq!( + ScalarValue::Int64(None), + Value::Null + .try_to_scalar_value(&ConcreteDataType::int64_datatype()) + .unwrap() + ); + assert_eq!( + ScalarValue::Float32(None), + Value::Null + .try_to_scalar_value(&ConcreteDataType::float32_datatype()) + .unwrap() + ); + assert_eq!( + ScalarValue::Float64(None), + Value::Null + .try_to_scalar_value(&ConcreteDataType::float64_datatype()) + .unwrap() + ); + assert_eq!( + ScalarValue::Utf8(None), + Value::Null + .try_to_scalar_value(&ConcreteDataType::string_datatype()) + .unwrap() + ); + assert_eq!( + ScalarValue::LargeBinary(None), + Value::Null + .try_to_scalar_value(&ConcreteDataType::binary_datatype()) + .unwrap() + ); + } + + #[test] + fn test_list_value_to_scalar_value() { + let items = Some(Box::new(vec![Value::Int32(-1), Value::Null])); + let list = Value::List(ListValue::new(items, ConcreteDataType::int32_datatype())); + let df_list = list + .try_to_scalar_value(&ConcreteDataType::list_datatype( + ConcreteDataType::int32_datatype(), + )) + .unwrap(); + assert!(matches!(df_list, ScalarValue::List(_, _))); + match df_list { + ScalarValue::List(vs, field) => { + assert_eq!(ArrowDataType::Int32, *field.data_type()); + + let vs = vs.unwrap(); + assert_eq!( + vs, + vec![ScalarValue::Int32(Some(-1)), ScalarValue::Int32(None)] + ); + } + _ => unreachable!(), + } + } + + #[test] + fn test_timestamp_to_scalar_value() { + assert_eq!( + ScalarValue::TimestampSecond(Some(1), None), + timestamp_to_scalar_value(TimeUnit::Second, Some(1)) + ); + assert_eq!( + ScalarValue::TimestampMillisecond(Some(1), None), + timestamp_to_scalar_value(TimeUnit::Millisecond, Some(1)) + ); + assert_eq!( + ScalarValue::TimestampMicrosecond(Some(1), None), + timestamp_to_scalar_value(TimeUnit::Microsecond, Some(1)) + ); + assert_eq!( + ScalarValue::TimestampNanosecond(Some(1), None), + timestamp_to_scalar_value(TimeUnit::Nanosecond, Some(1)) + ); + } }