mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-04 04:12:55 +00:00
fix: Fix common::query compiler errors (#713)
* feat: Move conversion to ScalarValue to value.rs * fix: Fix common::query compiler errors This commit also make InnerError pub(crate)
This commit is contained in:
@@ -24,8 +24,8 @@ use statrs::StatsError;
|
||||
common_error::define_opaque_error!(Error);
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
#[snafu(visibility(pub))]
|
||||
pub enum InnerError {
|
||||
#[snafu(visibility(pub(crate)))]
|
||||
pub(crate) enum InnerError {
|
||||
#[snafu(display("Fail to execute function, source: {}", source))]
|
||||
ExecuteFunction {
|
||||
source: DataFusionError,
|
||||
@@ -120,6 +120,12 @@ pub enum InnerError {
|
||||
#[snafu(backtrace)]
|
||||
source: BoxedError,
|
||||
},
|
||||
|
||||
#[snafu(display("Query engine fail to cast value: {}", source))]
|
||||
ToScalarValue {
|
||||
#[snafu(backtrace)]
|
||||
source: DataTypeError,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -133,7 +139,8 @@ impl ErrorExt for InnerError {
|
||||
| InnerError::DowncastVector { .. }
|
||||
| InnerError::InvalidInputState { .. }
|
||||
| InnerError::InvalidInputCol { .. }
|
||||
| InnerError::BadAccumulatorImpl { .. } => StatusCode::EngineExecuteQuery,
|
||||
| InnerError::BadAccumulatorImpl { .. }
|
||||
| InnerError::ToScalarValue { .. } => StatusCode::EngineExecuteQuery,
|
||||
|
||||
InnerError::InvalidInputs { source, .. }
|
||||
| InnerError::IntoVector { source, .. }
|
||||
|
||||
@@ -142,7 +142,9 @@ impl DfAccumulator for DfAccumulatorAdaptor {
|
||||
.into_iter()
|
||||
.zip(state_types.iter())
|
||||
.map(|(v, t)| {
|
||||
let scalar = try_into_scalar_value(v, t)?;
|
||||
let scalar = v
|
||||
.try_to_scalar_value(t)
|
||||
.context(error::ToScalarValueSnafu)?;
|
||||
Ok(AggregateState::Scalar(scalar))
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()
|
||||
@@ -175,312 +177,10 @@ impl DfAccumulator for DfAccumulatorAdaptor {
|
||||
fn evaluate(&self) -> DfResult<ScalarValue> {
|
||||
let value = self.accumulator.evaluate()?;
|
||||
let output_type = self.creator.output_type()?;
|
||||
Ok(try_into_scalar_value(value, &output_type)?)
|
||||
}
|
||||
}
|
||||
|
||||
fn try_into_scalar_value(value: Value, datatype: &ConcreteDataType) -> Result<ScalarValue> {
|
||||
if !matches!(value, Value::Null) && datatype != &value.data_type() {
|
||||
return error::BadAccumulatorImplSnafu {
|
||||
err_msg: format!(
|
||||
"expect value to return datatype {:?}, actual: {:?}",
|
||||
datatype,
|
||||
value.data_type()
|
||||
),
|
||||
}
|
||||
.fail()?;
|
||||
}
|
||||
|
||||
Ok(match value {
|
||||
Value::Boolean(v) => ScalarValue::Boolean(Some(v)),
|
||||
Value::UInt8(v) => ScalarValue::UInt8(Some(v)),
|
||||
Value::UInt16(v) => ScalarValue::UInt16(Some(v)),
|
||||
Value::UInt32(v) => ScalarValue::UInt32(Some(v)),
|
||||
Value::UInt64(v) => ScalarValue::UInt64(Some(v)),
|
||||
Value::Int8(v) => ScalarValue::Int8(Some(v)),
|
||||
Value::Int16(v) => ScalarValue::Int16(Some(v)),
|
||||
Value::Int32(v) => ScalarValue::Int32(Some(v)),
|
||||
Value::Int64(v) => ScalarValue::Int64(Some(v)),
|
||||
Value::Float32(v) => ScalarValue::Float32(Some(v.0)),
|
||||
Value::Float64(v) => ScalarValue::Float64(Some(v.0)),
|
||||
Value::String(v) => ScalarValue::Utf8(Some(v.as_utf8().to_string())),
|
||||
Value::Binary(v) => ScalarValue::LargeBinary(Some(v.to_vec())),
|
||||
Value::Date(v) => ScalarValue::Date32(Some(v.val())),
|
||||
Value::DateTime(v) => ScalarValue::Date64(Some(v.val())),
|
||||
Value::Null => try_convert_null_value(datatype)?,
|
||||
Value::List(list) => try_convert_list_value(list)?,
|
||||
Value::Timestamp(t) => timestamp_to_scalar_value(t.unit(), Some(t.value())),
|
||||
})
|
||||
}
|
||||
|
||||
fn timestamp_to_scalar_value(unit: TimeUnit, val: Option<i64>) -> ScalarValue {
|
||||
match unit {
|
||||
TimeUnit::Second => ScalarValue::TimestampSecond(val, None),
|
||||
TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(val, None),
|
||||
TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(val, None),
|
||||
TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(val, None),
|
||||
}
|
||||
}
|
||||
|
||||
fn try_convert_null_value(datatype: &ConcreteDataType) -> Result<ScalarValue> {
|
||||
Ok(match datatype {
|
||||
ConcreteDataType::Boolean(_) => ScalarValue::Boolean(None),
|
||||
ConcreteDataType::Int8(_) => ScalarValue::Int8(None),
|
||||
ConcreteDataType::Int16(_) => ScalarValue::Int16(None),
|
||||
ConcreteDataType::Int32(_) => ScalarValue::Int32(None),
|
||||
ConcreteDataType::Int64(_) => ScalarValue::Int64(None),
|
||||
ConcreteDataType::UInt8(_) => ScalarValue::UInt8(None),
|
||||
ConcreteDataType::UInt16(_) => ScalarValue::UInt16(None),
|
||||
ConcreteDataType::UInt32(_) => ScalarValue::UInt32(None),
|
||||
ConcreteDataType::UInt64(_) => ScalarValue::UInt64(None),
|
||||
ConcreteDataType::Float32(_) => ScalarValue::Float32(None),
|
||||
ConcreteDataType::Float64(_) => ScalarValue::Float64(None),
|
||||
ConcreteDataType::Binary(_) => ScalarValue::LargeBinary(None),
|
||||
ConcreteDataType::String(_) => ScalarValue::Utf8(None),
|
||||
ConcreteDataType::Timestamp(t) => timestamp_to_scalar_value(t.unit(), None),
|
||||
_ => {
|
||||
return error::BadAccumulatorImplSnafu {
|
||||
err_msg: format!(
|
||||
"undefined transition from null value to datatype {:?}",
|
||||
datatype
|
||||
),
|
||||
}
|
||||
.fail()?
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn try_convert_list_value(list: ListValue) -> Result<ScalarValue> {
|
||||
let vs = if let Some(items) = list.items() {
|
||||
Some(Box::new(
|
||||
items
|
||||
.iter()
|
||||
.map(|v| try_into_scalar_value(v.clone(), list.datatype()))
|
||||
.collect::<Result<Vec<_>>>()?,
|
||||
))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
Ok(ScalarValue::List(
|
||||
vs,
|
||||
Box::new(list.datatype().as_arrow_type()),
|
||||
))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use common_base::bytes::{Bytes, StringBytes};
|
||||
use datafusion_common::ScalarValue;
|
||||
use datatypes::arrow::datatypes::DataType;
|
||||
use datatypes::value::{ListValue, OrderedFloat};
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_not_null_value_to_scalar_value() {
|
||||
assert_eq!(
|
||||
ScalarValue::Boolean(Some(true)),
|
||||
try_into_scalar_value(Value::Boolean(true), &ConcreteDataType::boolean_datatype())
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::Boolean(Some(false)),
|
||||
try_into_scalar_value(Value::Boolean(false), &ConcreteDataType::boolean_datatype())
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::UInt8(Some(u8::MIN + 1)),
|
||||
try_into_scalar_value(
|
||||
Value::UInt8(u8::MIN + 1),
|
||||
&ConcreteDataType::uint8_datatype()
|
||||
)
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::UInt16(Some(u16::MIN + 2)),
|
||||
try_into_scalar_value(
|
||||
Value::UInt16(u16::MIN + 2),
|
||||
&ConcreteDataType::uint16_datatype()
|
||||
)
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::UInt32(Some(u32::MIN + 3)),
|
||||
try_into_scalar_value(
|
||||
Value::UInt32(u32::MIN + 3),
|
||||
&ConcreteDataType::uint32_datatype()
|
||||
)
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::UInt64(Some(u64::MIN + 4)),
|
||||
try_into_scalar_value(
|
||||
Value::UInt64(u64::MIN + 4),
|
||||
&ConcreteDataType::uint64_datatype()
|
||||
)
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::Int8(Some(i8::MIN + 4)),
|
||||
try_into_scalar_value(Value::Int8(i8::MIN + 4), &ConcreteDataType::int8_datatype())
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::Int16(Some(i16::MIN + 5)),
|
||||
try_into_scalar_value(
|
||||
Value::Int16(i16::MIN + 5),
|
||||
&ConcreteDataType::int16_datatype()
|
||||
)
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::Int32(Some(i32::MIN + 6)),
|
||||
try_into_scalar_value(
|
||||
Value::Int32(i32::MIN + 6),
|
||||
&ConcreteDataType::int32_datatype()
|
||||
)
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::Int64(Some(i64::MIN + 7)),
|
||||
try_into_scalar_value(
|
||||
Value::Int64(i64::MIN + 7),
|
||||
&ConcreteDataType::int64_datatype()
|
||||
)
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::Float32(Some(8.0f32)),
|
||||
try_into_scalar_value(
|
||||
Value::Float32(OrderedFloat(8.0f32)),
|
||||
&ConcreteDataType::float32_datatype()
|
||||
)
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::Float64(Some(9.0f64)),
|
||||
try_into_scalar_value(
|
||||
Value::Float64(OrderedFloat(9.0f64)),
|
||||
&ConcreteDataType::float64_datatype()
|
||||
)
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::Utf8(Some("hello".to_string())),
|
||||
try_into_scalar_value(
|
||||
Value::String(StringBytes::from("hello")),
|
||||
&ConcreteDataType::string_datatype(),
|
||||
)
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::LargeBinary(Some("world".as_bytes().to_vec())),
|
||||
try_into_scalar_value(
|
||||
Value::Binary(Bytes::from("world".as_bytes())),
|
||||
&ConcreteDataType::binary_datatype()
|
||||
)
|
||||
.unwrap()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_null_value_to_scalar_value() {
|
||||
assert_eq!(
|
||||
ScalarValue::Boolean(None),
|
||||
try_into_scalar_value(Value::Null, &ConcreteDataType::boolean_datatype()).unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::UInt8(None),
|
||||
try_into_scalar_value(Value::Null, &ConcreteDataType::uint8_datatype()).unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::UInt16(None),
|
||||
try_into_scalar_value(Value::Null, &ConcreteDataType::uint16_datatype()).unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::UInt32(None),
|
||||
try_into_scalar_value(Value::Null, &ConcreteDataType::uint32_datatype()).unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::UInt64(None),
|
||||
try_into_scalar_value(Value::Null, &ConcreteDataType::uint64_datatype()).unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::Int8(None),
|
||||
try_into_scalar_value(Value::Null, &ConcreteDataType::int8_datatype()).unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::Int16(None),
|
||||
try_into_scalar_value(Value::Null, &ConcreteDataType::int16_datatype()).unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::Int32(None),
|
||||
try_into_scalar_value(Value::Null, &ConcreteDataType::int32_datatype()).unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::Int64(None),
|
||||
try_into_scalar_value(Value::Null, &ConcreteDataType::int64_datatype()).unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::Float32(None),
|
||||
try_into_scalar_value(Value::Null, &ConcreteDataType::float32_datatype()).unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::Float64(None),
|
||||
try_into_scalar_value(Value::Null, &ConcreteDataType::float64_datatype()).unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::Utf8(None),
|
||||
try_into_scalar_value(Value::Null, &ConcreteDataType::string_datatype()).unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::LargeBinary(None),
|
||||
try_into_scalar_value(Value::Null, &ConcreteDataType::binary_datatype()).unwrap()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_list_value_to_scalar_value() {
|
||||
let items = Some(Box::new(vec![Value::Int32(-1), Value::Null]));
|
||||
let list = Value::List(ListValue::new(items, ConcreteDataType::int32_datatype()));
|
||||
let df_list = try_into_scalar_value(
|
||||
list,
|
||||
&ConcreteDataType::list_datatype(ConcreteDataType::int32_datatype()),
|
||||
)
|
||||
.unwrap();
|
||||
assert!(matches!(df_list, ScalarValue::List(_, _)));
|
||||
match df_list {
|
||||
ScalarValue::List(vs, datatype) => {
|
||||
assert_eq!(*datatype, DataType::Int32);
|
||||
|
||||
assert!(vs.is_some());
|
||||
let vs = *vs.unwrap();
|
||||
assert_eq!(
|
||||
vs,
|
||||
vec![ScalarValue::Int32(Some(-1)), ScalarValue::Int32(None)]
|
||||
);
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_timestamp_to_scalar_value() {
|
||||
assert_eq!(
|
||||
ScalarValue::TimestampSecond(Some(1), None),
|
||||
timestamp_to_scalar_value(TimeUnit::Second, Some(1))
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::TimestampMillisecond(Some(1), None),
|
||||
timestamp_to_scalar_value(TimeUnit::Millisecond, Some(1))
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::TimestampMicrosecond(Some(1), None),
|
||||
timestamp_to_scalar_value(TimeUnit::Microsecond, Some(1))
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::TimestampNanosecond(Some(1), None),
|
||||
timestamp_to_scalar_value(TimeUnit::Nanosecond, Some(1))
|
||||
);
|
||||
let scalar_value = value
|
||||
.try_to_scalar_value(&output_type)
|
||||
.context(error::ToScalarValueSnafu)
|
||||
.map_err(Error::from)?;
|
||||
Ok(scalar_value)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -110,6 +110,7 @@ impl PhysicalPlan for PhysicalPlanAdapter {
|
||||
.collect();
|
||||
let plan = self
|
||||
.df_plan
|
||||
.clone()
|
||||
.with_new_children(children)
|
||||
.context(error::GeneralDataFusionSnafu)?;
|
||||
Ok(Arc::new(PhysicalPlanAdapter::new(self.schema(), plan)))
|
||||
|
||||
@@ -134,6 +134,14 @@ impl ConcreteDataType {
|
||||
pub fn is_null(&self) -> bool {
|
||||
matches!(self, ConcreteDataType::Null(NullType))
|
||||
}
|
||||
|
||||
/// Try to cast the type as a [`ListType`].
|
||||
pub fn as_list(&self) -> Option<&ListType> {
|
||||
match self {
|
||||
ConcreteDataType::List(t) => Some(t),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<&ArrowDataType> for ConcreteDataType {
|
||||
@@ -483,4 +491,14 @@ mod tests {
|
||||
let nums = ConcreteDataType::numerics();
|
||||
assert_eq!(10, nums.len());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_as_list() {
|
||||
let list_type = ConcreteDataType::list_datatype(ConcreteDataType::int32_datatype());
|
||||
assert_eq!(
|
||||
ListType::new(ConcreteDataType::int32_datatype()),
|
||||
*list_type.as_list().unwrap()
|
||||
);
|
||||
assert!(ConcreteDataType::int32_datatype().as_list().is_none());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -99,6 +99,12 @@ pub enum Error {
|
||||
|
||||
#[snafu(display("Duplicated metadata for {}", key))]
|
||||
DuplicateMeta { key: String, backtrace: Backtrace },
|
||||
|
||||
#[snafu(display("Failed to convert value into scalar value, reason: {}", reason))]
|
||||
ToScalarValue {
|
||||
reason: String,
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
}
|
||||
|
||||
impl ErrorExt for Error {
|
||||
|
||||
@@ -41,6 +41,12 @@ impl ListType {
|
||||
item_type: Box::new(item_type),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the item data type.
|
||||
#[inline]
|
||||
pub fn item_type(&self) -> &ConcreteDataType {
|
||||
&self.item_type
|
||||
}
|
||||
}
|
||||
|
||||
impl DataType for ListType {
|
||||
@@ -91,5 +97,6 @@ mod tests {
|
||||
ArrowDataType::List(Box::new(Field::new("item", ArrowDataType::Boolean, true))),
|
||||
t.as_arrow_type()
|
||||
);
|
||||
assert_eq!(ConcreteDataType::boolean_datatype(), *t.item_type());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
use std::cmp::Ordering;
|
||||
use std::fmt::{Display, Formatter};
|
||||
|
||||
use arrow::datatypes::{DataType as ArrowDataType, Field};
|
||||
use common_base::bytes::{Bytes, StringBytes};
|
||||
use common_time::date::Date;
|
||||
use common_time::datetime::DateTime;
|
||||
@@ -22,10 +23,12 @@ use common_time::timestamp::{TimeUnit, Timestamp};
|
||||
use datafusion_common::ScalarValue;
|
||||
pub use ordered_float::OrderedFloat;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::ensure;
|
||||
|
||||
use crate::error::{self, Result};
|
||||
use crate::prelude::*;
|
||||
use crate::type_id::LogicalTypeId;
|
||||
use crate::types::ListType;
|
||||
use crate::vectors::ListVector;
|
||||
|
||||
pub type OrderedF32 = OrderedFloat<f32>;
|
||||
@@ -110,7 +113,6 @@ impl Value {
|
||||
/// # Panics
|
||||
/// Panics if the data type is not supported.
|
||||
pub fn data_type(&self) -> ConcreteDataType {
|
||||
// TODO(yingwen): Implement this once all data types are implemented.
|
||||
match self {
|
||||
Value::Null => ConcreteDataType::null_datatype(),
|
||||
Value::Boolean(_) => ConcreteDataType::boolean_datatype(),
|
||||
@@ -202,6 +204,87 @@ impl Value {
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert the value into [`ScalarValue`] according to the `output_type`.
|
||||
pub fn try_to_scalar_value(&self, output_type: &ConcreteDataType) -> Result<ScalarValue> {
|
||||
// Compare logical type, since value might not contains full type information.
|
||||
let value_type_id = self.logical_type_id();
|
||||
let output_type_id = output_type.logical_type_id();
|
||||
ensure!(
|
||||
output_type_id == value_type_id || self.is_null(),
|
||||
error::ToScalarValueSnafu {
|
||||
reason: format!(
|
||||
"expect value to return output_type {:?}, actual: {:?}",
|
||||
output_type_id, value_type_id,
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
let scalar_value = match self {
|
||||
Value::Boolean(v) => ScalarValue::Boolean(Some(*v)),
|
||||
Value::UInt8(v) => ScalarValue::UInt8(Some(*v)),
|
||||
Value::UInt16(v) => ScalarValue::UInt16(Some(*v)),
|
||||
Value::UInt32(v) => ScalarValue::UInt32(Some(*v)),
|
||||
Value::UInt64(v) => ScalarValue::UInt64(Some(*v)),
|
||||
Value::Int8(v) => ScalarValue::Int8(Some(*v)),
|
||||
Value::Int16(v) => ScalarValue::Int16(Some(*v)),
|
||||
Value::Int32(v) => ScalarValue::Int32(Some(*v)),
|
||||
Value::Int64(v) => ScalarValue::Int64(Some(*v)),
|
||||
Value::Float32(v) => ScalarValue::Float32(Some(v.0)),
|
||||
Value::Float64(v) => ScalarValue::Float64(Some(v.0)),
|
||||
Value::String(v) => ScalarValue::Utf8(Some(v.as_utf8().to_string())),
|
||||
Value::Binary(v) => ScalarValue::LargeBinary(Some(v.to_vec())),
|
||||
Value::Date(v) => ScalarValue::Date32(Some(v.val())),
|
||||
Value::DateTime(v) => ScalarValue::Date64(Some(v.val())),
|
||||
Value::Null => to_null_value(output_type),
|
||||
Value::List(list) => {
|
||||
// Safety: The logical type of the value and output_type are the same.
|
||||
let list_type = output_type.as_list().unwrap();
|
||||
list.try_to_scalar_value(list_type)?
|
||||
}
|
||||
Value::Timestamp(t) => timestamp_to_scalar_value(t.unit(), Some(t.value())),
|
||||
};
|
||||
|
||||
Ok(scalar_value)
|
||||
}
|
||||
}
|
||||
|
||||
fn to_null_value(output_type: &ConcreteDataType) -> ScalarValue {
|
||||
match output_type {
|
||||
ConcreteDataType::Null(_) => ScalarValue::Null,
|
||||
ConcreteDataType::Boolean(_) => ScalarValue::Boolean(None),
|
||||
ConcreteDataType::Int8(_) => ScalarValue::Int8(None),
|
||||
ConcreteDataType::Int16(_) => ScalarValue::Int16(None),
|
||||
ConcreteDataType::Int32(_) => ScalarValue::Int32(None),
|
||||
ConcreteDataType::Int64(_) => ScalarValue::Int64(None),
|
||||
ConcreteDataType::UInt8(_) => ScalarValue::UInt8(None),
|
||||
ConcreteDataType::UInt16(_) => ScalarValue::UInt16(None),
|
||||
ConcreteDataType::UInt32(_) => ScalarValue::UInt32(None),
|
||||
ConcreteDataType::UInt64(_) => ScalarValue::UInt64(None),
|
||||
ConcreteDataType::Float32(_) => ScalarValue::Float32(None),
|
||||
ConcreteDataType::Float64(_) => ScalarValue::Float64(None),
|
||||
ConcreteDataType::Binary(_) => ScalarValue::LargeBinary(None),
|
||||
ConcreteDataType::String(_) => ScalarValue::Utf8(None),
|
||||
ConcreteDataType::Date(_) => ScalarValue::Date32(None),
|
||||
ConcreteDataType::DateTime(_) => ScalarValue::Date64(None),
|
||||
ConcreteDataType::Timestamp(t) => timestamp_to_scalar_value(t.unit(), None),
|
||||
ConcreteDataType::List(_) => {
|
||||
ScalarValue::List(None, Box::new(new_item_field(output_type.as_arrow_type())))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn new_item_field(data_type: ArrowDataType) -> Field {
|
||||
Field::new("item", data_type, false)
|
||||
}
|
||||
|
||||
fn timestamp_to_scalar_value(unit: TimeUnit, val: Option<i64>) -> ScalarValue {
|
||||
match unit {
|
||||
TimeUnit::Second => ScalarValue::TimestampSecond(val, None),
|
||||
TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(val, None),
|
||||
TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(val, None),
|
||||
TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(val, None),
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! impl_ord_for_value_like {
|
||||
@@ -366,6 +449,24 @@ impl ListValue {
|
||||
pub fn datatype(&self) -> &ConcreteDataType {
|
||||
&self.datatype
|
||||
}
|
||||
|
||||
fn try_to_scalar_value(&self, output_type: &ListType) -> Result<ScalarValue> {
|
||||
let vs = if let Some(items) = self.items() {
|
||||
Some(
|
||||
items
|
||||
.iter()
|
||||
.map(|v| v.try_to_scalar_value(output_type.item_type()))
|
||||
.collect::<Result<Vec<_>>>()?,
|
||||
)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
Ok(ScalarValue::List(
|
||||
vs,
|
||||
Box::new(new_item_field(output_type.as_arrow_type())),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for ListValue {
|
||||
@@ -1272,4 +1373,218 @@ mod tests {
|
||||
"TimestampNanosecondType[]"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_not_null_value_to_scalar_value() {
|
||||
assert_eq!(
|
||||
ScalarValue::Boolean(Some(true)),
|
||||
Value::Boolean(true)
|
||||
.try_to_scalar_value(&ConcreteDataType::boolean_datatype())
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::Boolean(Some(false)),
|
||||
Value::Boolean(false)
|
||||
.try_to_scalar_value(&ConcreteDataType::boolean_datatype())
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::UInt8(Some(u8::MIN + 1)),
|
||||
Value::UInt8(u8::MIN + 1)
|
||||
.try_to_scalar_value(&ConcreteDataType::uint8_datatype())
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::UInt16(Some(u16::MIN + 2)),
|
||||
Value::UInt16(u16::MIN + 2)
|
||||
.try_to_scalar_value(&ConcreteDataType::uint16_datatype())
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::UInt32(Some(u32::MIN + 3)),
|
||||
Value::UInt32(u32::MIN + 3)
|
||||
.try_to_scalar_value(&ConcreteDataType::uint32_datatype())
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::UInt64(Some(u64::MIN + 4)),
|
||||
Value::UInt64(u64::MIN + 4)
|
||||
.try_to_scalar_value(&ConcreteDataType::uint64_datatype())
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::Int8(Some(i8::MIN + 4)),
|
||||
Value::Int8(i8::MIN + 4)
|
||||
.try_to_scalar_value(&ConcreteDataType::int8_datatype())
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::Int16(Some(i16::MIN + 5)),
|
||||
Value::Int16(i16::MIN + 5)
|
||||
.try_to_scalar_value(&ConcreteDataType::int16_datatype())
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::Int32(Some(i32::MIN + 6)),
|
||||
Value::Int32(i32::MIN + 6)
|
||||
.try_to_scalar_value(&ConcreteDataType::int32_datatype())
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::Int64(Some(i64::MIN + 7)),
|
||||
Value::Int64(i64::MIN + 7)
|
||||
.try_to_scalar_value(&ConcreteDataType::int64_datatype())
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::Float32(Some(8.0f32)),
|
||||
Value::Float32(OrderedFloat(8.0f32))
|
||||
.try_to_scalar_value(&ConcreteDataType::float32_datatype())
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::Float64(Some(9.0f64)),
|
||||
Value::Float64(OrderedFloat(9.0f64))
|
||||
.try_to_scalar_value(&ConcreteDataType::float64_datatype())
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::Utf8(Some("hello".to_string())),
|
||||
Value::String(StringBytes::from("hello"))
|
||||
.try_to_scalar_value(&ConcreteDataType::string_datatype(),)
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::LargeBinary(Some("world".as_bytes().to_vec())),
|
||||
Value::Binary(Bytes::from("world".as_bytes()))
|
||||
.try_to_scalar_value(&ConcreteDataType::binary_datatype())
|
||||
.unwrap()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_null_value_to_scalar_value() {
|
||||
assert_eq!(
|
||||
ScalarValue::Boolean(None),
|
||||
Value::Null
|
||||
.try_to_scalar_value(&ConcreteDataType::boolean_datatype())
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::UInt8(None),
|
||||
Value::Null
|
||||
.try_to_scalar_value(&ConcreteDataType::uint8_datatype())
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::UInt16(None),
|
||||
Value::Null
|
||||
.try_to_scalar_value(&ConcreteDataType::uint16_datatype())
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::UInt32(None),
|
||||
Value::Null
|
||||
.try_to_scalar_value(&ConcreteDataType::uint32_datatype())
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::UInt64(None),
|
||||
Value::Null
|
||||
.try_to_scalar_value(&ConcreteDataType::uint64_datatype())
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::Int8(None),
|
||||
Value::Null
|
||||
.try_to_scalar_value(&ConcreteDataType::int8_datatype())
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::Int16(None),
|
||||
Value::Null
|
||||
.try_to_scalar_value(&ConcreteDataType::int16_datatype())
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::Int32(None),
|
||||
Value::Null
|
||||
.try_to_scalar_value(&ConcreteDataType::int32_datatype())
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::Int64(None),
|
||||
Value::Null
|
||||
.try_to_scalar_value(&ConcreteDataType::int64_datatype())
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::Float32(None),
|
||||
Value::Null
|
||||
.try_to_scalar_value(&ConcreteDataType::float32_datatype())
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::Float64(None),
|
||||
Value::Null
|
||||
.try_to_scalar_value(&ConcreteDataType::float64_datatype())
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::Utf8(None),
|
||||
Value::Null
|
||||
.try_to_scalar_value(&ConcreteDataType::string_datatype())
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::LargeBinary(None),
|
||||
Value::Null
|
||||
.try_to_scalar_value(&ConcreteDataType::binary_datatype())
|
||||
.unwrap()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_list_value_to_scalar_value() {
|
||||
let items = Some(Box::new(vec![Value::Int32(-1), Value::Null]));
|
||||
let list = Value::List(ListValue::new(items, ConcreteDataType::int32_datatype()));
|
||||
let df_list = list
|
||||
.try_to_scalar_value(&ConcreteDataType::list_datatype(
|
||||
ConcreteDataType::int32_datatype(),
|
||||
))
|
||||
.unwrap();
|
||||
assert!(matches!(df_list, ScalarValue::List(_, _)));
|
||||
match df_list {
|
||||
ScalarValue::List(vs, field) => {
|
||||
assert_eq!(ArrowDataType::Int32, *field.data_type());
|
||||
|
||||
let vs = vs.unwrap();
|
||||
assert_eq!(
|
||||
vs,
|
||||
vec![ScalarValue::Int32(Some(-1)), ScalarValue::Int32(None)]
|
||||
);
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_timestamp_to_scalar_value() {
|
||||
assert_eq!(
|
||||
ScalarValue::TimestampSecond(Some(1), None),
|
||||
timestamp_to_scalar_value(TimeUnit::Second, Some(1))
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::TimestampMillisecond(Some(1), None),
|
||||
timestamp_to_scalar_value(TimeUnit::Millisecond, Some(1))
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::TimestampMicrosecond(Some(1), None),
|
||||
timestamp_to_scalar_value(TimeUnit::Microsecond, Some(1))
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::TimestampNanosecond(Some(1), None),
|
||||
timestamp_to_scalar_value(TimeUnit::Nanosecond, Some(1))
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user