feat: adds from_unixtime function (#420)

This commit is contained in:
dennis zhuang
2022-11-08 18:22:00 +08:00
committed by GitHub
parent 857054f70d
commit dd488e8d21
7 changed files with 145 additions and 0 deletions

1
Cargo.lock generated
View File

@@ -1059,6 +1059,7 @@ dependencies = [
"common-error",
"common-function-macro",
"common-query",
"common-time",
"datafusion-common",
"datatypes",
"libc",

View File

@@ -8,6 +8,7 @@ arc-swap = "1.0"
chrono-tz = "0.6"
common-error = { path = "../error" }
common-function-macro = { path = "../function-macro" }
common-time = { path = "../time" }
common-query = { path = "../query" }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" }
datatypes = { path = "../../datatypes" }

View File

@@ -6,6 +6,7 @@ pub mod math;
pub mod numpy;
#[cfg(test)]
pub(crate) mod test;
mod timestamp;
pub mod udf;
pub use aggregate::MedianAccumulatorCreator;

View File

@@ -9,6 +9,7 @@ use crate::scalars::aggregate::{AggregateFunctionMetaRef, AggregateFunctions};
use crate::scalars::function::FunctionRef;
use crate::scalars::math::MathFunction;
use crate::scalars::numpy::NumpyFunction;
use crate::scalars::timestamp::TimestampFunction;
#[derive(Default)]
pub struct FunctionRegistry {
@@ -58,6 +59,7 @@ pub static FUNCTION_REGISTRY: Lazy<Arc<FunctionRegistry>> = Lazy::new(|| {
MathFunction::register(&function_registry);
NumpyFunction::register(&function_registry);
TimestampFunction::register(&function_registry);
AggregateFunctions::register(&function_registry);

View File

@@ -0,0 +1,116 @@
//! from_unixtime function.
/// TODO(dennis) It can be removed after we upgrade datafusion.
use std::fmt;
use std::sync::Arc;
use arrow::compute::arithmetics;
use arrow::datatypes::DataType as ArrowDatatype;
use arrow::scalar::PrimitiveScalar;
use common_query::error::{IntoVectorSnafu, UnsupportedInputDataTypeSnafu};
use common_query::prelude::{Signature, Volatility};
use datatypes::prelude::ConcreteDataType;
use datatypes::vectors::TimestampVector;
use datatypes::vectors::VectorRef;
use snafu::ResultExt;
use crate::error::Result;
use crate::scalars::function::{Function, FunctionContext};
#[derive(Clone, Debug, Default)]
pub struct FromUnixtimeFunction;
const NAME: &str = "from_unixtime";
impl Function for FromUnixtimeFunction {
fn name(&self) -> &str {
NAME
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::timestamp_millis_datatype())
}
fn signature(&self) -> Signature {
Signature::uniform(
1,
vec![ConcreteDataType::int64_datatype()],
Volatility::Immutable,
)
}
fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
match columns[0].data_type() {
ConcreteDataType::Int64(_) => {
let array = columns[0].to_arrow_array();
// Our timestamp vector's time unit is millisecond
let array = arithmetics::mul_scalar(
&*array,
&PrimitiveScalar::new(ArrowDatatype::Int64, Some(1000i64)),
);
Ok(Arc::new(
TimestampVector::try_from_arrow_array(array).context(IntoVectorSnafu {
data_type: ArrowDatatype::Int64,
})?,
))
}
_ => UnsupportedInputDataTypeSnafu {
function: NAME,
datatypes: columns.iter().map(|c| c.data_type()).collect::<Vec<_>>(),
}
.fail()
.map_err(|e| e.into()),
}
}
}
impl fmt::Display for FromUnixtimeFunction {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "FROM_UNIXTIME")
}
}
#[cfg(test)]
mod tests {
use common_query::prelude::TypeSignature;
use datatypes::value::Value;
use datatypes::vectors::Int64Vector;
use super::*;
#[test]
fn test_from_unixtime() {
let f = FromUnixtimeFunction::default();
assert_eq!("from_unixtime", f.name());
assert_eq!(
ConcreteDataType::timestamp_millis_datatype(),
f.return_type(&[]).unwrap()
);
assert!(matches!(f.signature(),
Signature {
type_signature: TypeSignature::Uniform(1, valid_types),
volatility: Volatility::Immutable
} if valid_types == vec![ConcreteDataType::int64_datatype()]
));
let times = vec![Some(1494410783), None, Some(1494410983)];
let args: Vec<VectorRef> = vec![Arc::new(Int64Vector::from(times.clone()))];
let vector = f.eval(FunctionContext::default(), &args).unwrap();
assert_eq!(3, vector.len());
for (i, t) in times.iter().enumerate() {
let v = vector.get(i);
if i == 1 {
assert_eq!(Value::Null, v);
continue;
}
match v {
Value::Timestamp(ts) => {
assert_eq!(ts.value(), t.unwrap() * 1000);
}
_ => unreachable!(),
}
}
}
}

View File

@@ -0,0 +1,14 @@
use std::sync::Arc;
mod from_unixtime;
use from_unixtime::FromUnixtimeFunction;
use crate::scalars::function_registry::FunctionRegistry;
pub(crate) struct TimestampFunction;
impl TimestampFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register(Arc::new(FromUnixtimeFunction::default()));
}
}

View File

@@ -4,6 +4,7 @@ use arrow::datatypes::DataType as ArrowDatatype;
use common_error::prelude::*;
use datafusion_common::DataFusionError;
use datatypes::error::Error as DataTypeError;
use datatypes::prelude::ConcreteDataType;
use statrs::StatsError;
common_error::define_opaque_error!(Error);
@@ -17,6 +18,13 @@ pub enum InnerError {
backtrace: Backtrace,
},
#[snafu(display("Unsupported input datatypes {:?} in function {}", datatypes, function))]
UnsupportedInputDataType {
function: String,
datatypes: Vec<ConcreteDataType>,
backtrace: Backtrace,
},
#[snafu(display("Fail to generate function, source: {}", source))]
GenerateFunction {
source: StatsError,
@@ -116,6 +124,8 @@ impl ErrorExt for InnerError {
| InnerError::GeneralDataFusion { .. }
| InnerError::DataFusionExecutionPlan { .. } => StatusCode::Unexpected,
InnerError::UnsupportedInputDataType { .. } => StatusCode::InvalidArguments,
InnerError::ConvertDfRecordBatchStream { source, .. } => source.status_code(),
}
}