diff --git a/Cargo.lock b/Cargo.lock index bb77552c70..ade6eba31d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1059,6 +1059,7 @@ dependencies = [ "common-error", "common-function-macro", "common-query", + "common-time", "datafusion-common", "datatypes", "libc", diff --git a/src/common/function/Cargo.toml b/src/common/function/Cargo.toml index 87be4aef8f..b5f118dfbd 100644 --- a/src/common/function/Cargo.toml +++ b/src/common/function/Cargo.toml @@ -8,6 +8,7 @@ arc-swap = "1.0" chrono-tz = "0.6" common-error = { path = "../error" } common-function-macro = { path = "../function-macro" } +common-time = { path = "../time" } common-query = { path = "../query" } datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" } datatypes = { path = "../../datatypes" } diff --git a/src/common/function/src/scalars.rs b/src/common/function/src/scalars.rs index 2773daec11..f59ed52db6 100644 --- a/src/common/function/src/scalars.rs +++ b/src/common/function/src/scalars.rs @@ -6,6 +6,7 @@ pub mod math; pub mod numpy; #[cfg(test)] pub(crate) mod test; +mod timestamp; pub mod udf; pub use aggregate::MedianAccumulatorCreator; diff --git a/src/common/function/src/scalars/function_registry.rs b/src/common/function/src/scalars/function_registry.rs index fb2a99ef02..7e70018a7d 100644 --- a/src/common/function/src/scalars/function_registry.rs +++ b/src/common/function/src/scalars/function_registry.rs @@ -9,6 +9,7 @@ use crate::scalars::aggregate::{AggregateFunctionMetaRef, AggregateFunctions}; use crate::scalars::function::FunctionRef; use crate::scalars::math::MathFunction; use crate::scalars::numpy::NumpyFunction; +use crate::scalars::timestamp::TimestampFunction; #[derive(Default)] pub struct FunctionRegistry { @@ -58,6 +59,7 @@ pub static FUNCTION_REGISTRY: Lazy> = Lazy::new(|| { MathFunction::register(&function_registry); NumpyFunction::register(&function_registry); + TimestampFunction::register(&function_registry); AggregateFunctions::register(&function_registry); diff --git a/src/common/function/src/scalars/timestamp/from_unixtime.rs b/src/common/function/src/scalars/timestamp/from_unixtime.rs new file mode 100644 index 0000000000..68cb995114 --- /dev/null +++ b/src/common/function/src/scalars/timestamp/from_unixtime.rs @@ -0,0 +1,116 @@ +//! from_unixtime function. +/// TODO(dennis) It can be removed after we upgrade datafusion. +use std::fmt; +use std::sync::Arc; + +use arrow::compute::arithmetics; +use arrow::datatypes::DataType as ArrowDatatype; +use arrow::scalar::PrimitiveScalar; +use common_query::error::{IntoVectorSnafu, UnsupportedInputDataTypeSnafu}; +use common_query::prelude::{Signature, Volatility}; +use datatypes::prelude::ConcreteDataType; +use datatypes::vectors::TimestampVector; +use datatypes::vectors::VectorRef; +use snafu::ResultExt; + +use crate::error::Result; +use crate::scalars::function::{Function, FunctionContext}; + +#[derive(Clone, Debug, Default)] +pub struct FromUnixtimeFunction; + +const NAME: &str = "from_unixtime"; + +impl Function for FromUnixtimeFunction { + fn name(&self) -> &str { + NAME + } + + fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result { + Ok(ConcreteDataType::timestamp_millis_datatype()) + } + + fn signature(&self) -> Signature { + Signature::uniform( + 1, + vec![ConcreteDataType::int64_datatype()], + Volatility::Immutable, + ) + } + + fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result { + match columns[0].data_type() { + ConcreteDataType::Int64(_) => { + let array = columns[0].to_arrow_array(); + // Our timestamp vector's time unit is millisecond + let array = arithmetics::mul_scalar( + &*array, + &PrimitiveScalar::new(ArrowDatatype::Int64, Some(1000i64)), + ); + + Ok(Arc::new( + TimestampVector::try_from_arrow_array(array).context(IntoVectorSnafu { + data_type: ArrowDatatype::Int64, + })?, + )) + } + _ => UnsupportedInputDataTypeSnafu { + function: NAME, + datatypes: columns.iter().map(|c| c.data_type()).collect::>(), + } + .fail() + .map_err(|e| e.into()), + } + } +} + +impl fmt::Display for FromUnixtimeFunction { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "FROM_UNIXTIME") + } +} + +#[cfg(test)] +mod tests { + use common_query::prelude::TypeSignature; + use datatypes::value::Value; + use datatypes::vectors::Int64Vector; + + use super::*; + + #[test] + fn test_from_unixtime() { + let f = FromUnixtimeFunction::default(); + assert_eq!("from_unixtime", f.name()); + assert_eq!( + ConcreteDataType::timestamp_millis_datatype(), + f.return_type(&[]).unwrap() + ); + + assert!(matches!(f.signature(), + Signature { + type_signature: TypeSignature::Uniform(1, valid_types), + volatility: Volatility::Immutable + } if valid_types == vec![ConcreteDataType::int64_datatype()] + )); + + let times = vec![Some(1494410783), None, Some(1494410983)]; + let args: Vec = vec![Arc::new(Int64Vector::from(times.clone()))]; + + let vector = f.eval(FunctionContext::default(), &args).unwrap(); + assert_eq!(3, vector.len()); + for (i, t) in times.iter().enumerate() { + let v = vector.get(i); + if i == 1 { + assert_eq!(Value::Null, v); + continue; + } + match v { + Value::Timestamp(ts) => { + assert_eq!(ts.value(), t.unwrap() * 1000); + } + _ => unreachable!(), + } + } + } +} diff --git a/src/common/function/src/scalars/timestamp/mod.rs b/src/common/function/src/scalars/timestamp/mod.rs new file mode 100644 index 0000000000..5648b31389 --- /dev/null +++ b/src/common/function/src/scalars/timestamp/mod.rs @@ -0,0 +1,14 @@ +use std::sync::Arc; +mod from_unixtime; + +use from_unixtime::FromUnixtimeFunction; + +use crate::scalars::function_registry::FunctionRegistry; + +pub(crate) struct TimestampFunction; + +impl TimestampFunction { + pub fn register(registry: &FunctionRegistry) { + registry.register(Arc::new(FromUnixtimeFunction::default())); + } +} diff --git a/src/common/query/src/error.rs b/src/common/query/src/error.rs index 0231f7cdc0..1e29da234c 100644 --- a/src/common/query/src/error.rs +++ b/src/common/query/src/error.rs @@ -4,6 +4,7 @@ use arrow::datatypes::DataType as ArrowDatatype; use common_error::prelude::*; use datafusion_common::DataFusionError; use datatypes::error::Error as DataTypeError; +use datatypes::prelude::ConcreteDataType; use statrs::StatsError; common_error::define_opaque_error!(Error); @@ -17,6 +18,13 @@ pub enum InnerError { backtrace: Backtrace, }, + #[snafu(display("Unsupported input datatypes {:?} in function {}", datatypes, function))] + UnsupportedInputDataType { + function: String, + datatypes: Vec, + backtrace: Backtrace, + }, + #[snafu(display("Fail to generate function, source: {}", source))] GenerateFunction { source: StatsError, @@ -116,6 +124,8 @@ impl ErrorExt for InnerError { | InnerError::GeneralDataFusion { .. } | InnerError::DataFusionExecutionPlan { .. } => StatusCode::Unexpected, + InnerError::UnsupportedInputDataType { .. } => StatusCode::InvalidArguments, + InnerError::ConvertDfRecordBatchStream { source, .. } => source.status_code(), } }