diff --git a/src/common/function/src/scalars/timestamp/to_unixtime.rs b/src/common/function/src/scalars/timestamp/to_unixtime.rs index 9970565d34..0bd8c2255e 100644 --- a/src/common/function/src/scalars/timestamp/to_unixtime.rs +++ b/src/common/function/src/scalars/timestamp/to_unixtime.rs @@ -18,36 +18,50 @@ use std::sync::Arc; use common_query::error::{InvalidFuncArgsSnafu, Result, UnsupportedInputDataTypeSnafu}; use common_query::prelude::{Signature, Volatility}; -use common_time::timestamp::TimeUnit; -use common_time::Timestamp; +use common_time::{Date, DateTime, Timestamp}; use datatypes::prelude::ConcreteDataType; -use datatypes::types::TimestampType; -use datatypes::vectors::{ - Int64Vector, StringVector, TimestampMicrosecondVector, TimestampMillisecondVector, - TimestampNanosecondVector, TimestampSecondVector, Vector, VectorRef, -}; +use datatypes::vectors::{Int64Vector, VectorRef}; use snafu::ensure; use crate::scalars::function::{Function, FunctionContext}; +/// A function to convert the column into the unix timestamp in seconds. #[derive(Clone, Debug, Default)] pub struct ToUnixtimeFunction; const NAME: &str = "to_unixtime"; fn convert_to_seconds(arg: &str) -> Option { - match Timestamp::from_str(arg) { - Ok(ts) => { - let sec_mul = (TimeUnit::Second.factor() / ts.unit().factor()) as i64; - Some(ts.value().div_euclid(sec_mul)) - } - Err(_err) => None, + if let Ok(dt) = DateTime::from_str(arg) { + return Some(dt.val() / 1000); } + + if let Ok(ts) = Timestamp::from_str(arg) { + return Some(ts.split().0); + } + + if let Ok(date) = Date::from_str(arg) { + return Some(date.to_secs()); + } + + None } -fn process_vector(vector: &dyn Vector) -> Vec> { +fn convert_timestamps_to_seconds(vector: &VectorRef) -> Vec> { (0..vector.len()) - .map(|i| paste::expr!((vector.get(i)).as_timestamp().map(|ts| ts.value()))) + .map(|i| vector.get(i).as_timestamp().map(|ts| ts.split().0)) + .collect::>>() +} + +fn convert_dates_to_seconds(vector: &VectorRef) -> Vec> { + (0..vector.len()) + .map(|i| vector.get(i).as_date().map(|dt| dt.to_secs())) + .collect::>>() +} + +fn convert_datetimes_to_seconds(vector: &VectorRef) -> Vec> { + (0..vector.len()) + .map(|i| vector.get(i).as_datetime().map(|dt| dt.val() / 1000)) .collect::>>() } @@ -67,6 +81,8 @@ impl Function for ToUnixtimeFunction { ConcreteDataType::string_datatype(), ConcreteDataType::int32_datatype(), ConcreteDataType::int64_datatype(), + ConcreteDataType::date_datatype(), + ConcreteDataType::datetime_datatype(), ConcreteDataType::timestamp_second_datatype(), ConcreteDataType::timestamp_millisecond_datatype(), ConcreteDataType::timestamp_microsecond_datatype(), @@ -87,51 +103,29 @@ impl Function for ToUnixtimeFunction { } ); + let vector = &columns[0]; + match columns[0].data_type() { - ConcreteDataType::String(_) => { - let array = columns[0].to_arrow_array(); - let vector = StringVector::try_from_arrow_array(&array).unwrap(); - Ok(Arc::new(Int64Vector::from( - (0..vector.len()) - .map(|i| convert_to_seconds(&vector.get(i).to_string())) - .collect::>(), - ))) - } + ConcreteDataType::String(_) => Ok(Arc::new(Int64Vector::from( + (0..vector.len()) + .map(|i| convert_to_seconds(&vector.get(i).to_string())) + .collect::>(), + ))), ConcreteDataType::Int64(_) | ConcreteDataType::Int32(_) => { - let array = columns[0].to_arrow_array(); - Ok(Arc::new(Int64Vector::try_from_arrow_array(&array).unwrap())) + // Safety: cast always successfully at here + Ok(vector.cast(&ConcreteDataType::int64_datatype()).unwrap()) } - ConcreteDataType::Timestamp(ts) => { - let array = columns[0].to_arrow_array(); - let value = match ts { - TimestampType::Second(_) => { - let vector = paste::expr!(TimestampSecondVector::try_from_arrow_array( - array - ) - .unwrap()); - process_vector(&vector) - } - TimestampType::Millisecond(_) => { - let vector = paste::expr!( - TimestampMillisecondVector::try_from_arrow_array(array).unwrap() - ); - process_vector(&vector) - } - TimestampType::Microsecond(_) => { - let vector = paste::expr!( - TimestampMicrosecondVector::try_from_arrow_array(array).unwrap() - ); - process_vector(&vector) - } - TimestampType::Nanosecond(_) => { - let vector = paste::expr!(TimestampNanosecondVector::try_from_arrow_array( - array - ) - .unwrap()); - process_vector(&vector) - } - }; - Ok(Arc::new(Int64Vector::from(value))) + ConcreteDataType::Date(_) => { + let seconds = convert_dates_to_seconds(vector); + Ok(Arc::new(Int64Vector::from(seconds))) + } + ConcreteDataType::DateTime(_) => { + let seconds = convert_datetimes_to_seconds(vector); + Ok(Arc::new(Int64Vector::from(seconds))) + } + ConcreteDataType::Timestamp(_) => { + let seconds = convert_timestamps_to_seconds(vector); + Ok(Arc::new(Int64Vector::from(seconds))) } _ => UnsupportedInputDataTypeSnafu { function: NAME, @@ -151,11 +145,11 @@ impl fmt::Display for ToUnixtimeFunction { #[cfg(test)] mod tests { use common_query::prelude::TypeSignature; - use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder}; - use datatypes::scalars::ScalarVector; - use datatypes::timestamp::TimestampSecond; + use datatypes::prelude::ConcreteDataType; use datatypes::value::Value; - use datatypes::vectors::{StringVector, TimestampSecondVector}; + use datatypes::vectors::{ + DateTimeVector, DateVector, StringVector, TimestampMillisecondVector, TimestampSecondVector, + }; use super::{ToUnixtimeFunction, *}; use crate::scalars::Function; @@ -170,18 +164,20 @@ mod tests { ); assert!(matches!(f.signature(), - Signature { - type_signature: TypeSignature::Uniform(1, valid_types), - volatility: Volatility::Immutable - } if valid_types == vec![ - ConcreteDataType::string_datatype(), - ConcreteDataType::int32_datatype(), - ConcreteDataType::int64_datatype(), - ConcreteDataType::timestamp_second_datatype(), - ConcreteDataType::timestamp_millisecond_datatype(), - ConcreteDataType::timestamp_microsecond_datatype(), - ConcreteDataType::timestamp_nanosecond_datatype(), - ] + Signature { + type_signature: TypeSignature::Uniform(1, valid_types), + volatility: Volatility::Immutable + } if valid_types == vec![ + ConcreteDataType::string_datatype(), + ConcreteDataType::int32_datatype(), + ConcreteDataType::int64_datatype(), + ConcreteDataType::date_datatype(), + ConcreteDataType::datetime_datatype(), + ConcreteDataType::timestamp_second_datatype(), + ConcreteDataType::timestamp_millisecond_datatype(), + ConcreteDataType::timestamp_microsecond_datatype(), + ConcreteDataType::timestamp_nanosecond_datatype(), + ] )); let times = vec![ @@ -212,26 +208,6 @@ mod tests { #[test] fn test_int_to_unixtime() { let f = ToUnixtimeFunction; - assert_eq!("to_unixtime", f.name()); - assert_eq!( - ConcreteDataType::int64_datatype(), - f.return_type(&[]).unwrap() - ); - - assert!(matches!(f.signature(), - Signature { - type_signature: TypeSignature::Uniform(1, valid_types), - volatility: Volatility::Immutable - } if valid_types == vec![ - ConcreteDataType::string_datatype(), - ConcreteDataType::int32_datatype(), - ConcreteDataType::int64_datatype(), - ConcreteDataType::timestamp_second_datatype(), - ConcreteDataType::timestamp_millisecond_datatype(), - ConcreteDataType::timestamp_microsecond_datatype(), - ConcreteDataType::timestamp_nanosecond_datatype(), - ] - )); let times = vec![Some(3_i64), None, Some(5_i64), None]; let results = [Some(3), None, Some(5), None]; @@ -254,38 +230,13 @@ mod tests { } #[test] - fn test_timestamp_to_unixtime() { + fn test_date_to_unixtime() { let f = ToUnixtimeFunction; - assert_eq!("to_unixtime", f.name()); - assert_eq!( - ConcreteDataType::int64_datatype(), - f.return_type(&[]).unwrap() - ); - assert!(matches!(f.signature(), - Signature { - type_signature: TypeSignature::Uniform(1, valid_types), - volatility: Volatility::Immutable - } if valid_types == vec![ - ConcreteDataType::string_datatype(), - ConcreteDataType::int32_datatype(), - ConcreteDataType::int64_datatype(), - ConcreteDataType::timestamp_second_datatype(), - ConcreteDataType::timestamp_millisecond_datatype(), - ConcreteDataType::timestamp_microsecond_datatype(), - ConcreteDataType::timestamp_nanosecond_datatype(), - ] - )); - - let times: Vec> = vec![ - Some(TimestampSecond::new(123)), - None, - Some(TimestampSecond::new(42)), - None, - ]; - let results = [Some(123), None, Some(42), None]; - let ts_vector: TimestampSecondVector = build_vector_from_slice(×); - let args: Vec = vec![Arc::new(ts_vector)]; + let times = vec![Some(123), None, Some(42), None]; + let results = [Some(10627200), None, Some(3628800), None]; + let date_vector = DateVector::from(times.clone()); + let args: Vec = vec![Arc::new(date_vector)]; let vector = f.eval(FunctionContext::default(), &args).unwrap(); assert_eq!(4, vector.len()); for (i, _t) in times.iter().enumerate() { @@ -303,11 +254,73 @@ mod tests { } } - fn build_vector_from_slice(items: &[Option>]) -> T { - let mut builder = T::Builder::with_capacity(items.len()); - for item in items { - builder.push(*item); + #[test] + fn test_datetime_to_unixtime() { + let f = ToUnixtimeFunction; + + let times = vec![Some(123000), None, Some(42000), None]; + let results = [Some(123), None, Some(42), None]; + let date_vector = DateTimeVector::from(times.clone()); + let args: Vec = vec![Arc::new(date_vector)]; + let vector = f.eval(FunctionContext::default(), &args).unwrap(); + assert_eq!(4, vector.len()); + for (i, _t) in times.iter().enumerate() { + let v = vector.get(i); + if i == 1 || i == 3 { + assert_eq!(Value::Null, v); + continue; + } + match v { + Value::Int64(ts) => { + assert_eq!(ts, (*results.get(i).unwrap()).unwrap()); + } + _ => unreachable!(), + } + } + } + + #[test] + fn test_timestamp_to_unixtime() { + let f = ToUnixtimeFunction; + + let times = vec![Some(123), None, Some(42), None]; + let results = [Some(123), None, Some(42), None]; + let ts_vector = TimestampSecondVector::from(times.clone()); + let args: Vec = vec![Arc::new(ts_vector)]; + let vector = f.eval(FunctionContext::default(), &args).unwrap(); + assert_eq!(4, vector.len()); + for (i, _t) in times.iter().enumerate() { + let v = vector.get(i); + if i == 1 || i == 3 { + assert_eq!(Value::Null, v); + continue; + } + match v { + Value::Int64(ts) => { + assert_eq!(ts, (*results.get(i).unwrap()).unwrap()); + } + _ => unreachable!(), + } + } + + let times = vec![Some(123000), None, Some(42000), None]; + let results = [Some(123), None, Some(42), None]; + let ts_vector = TimestampMillisecondVector::from(times.clone()); + let args: Vec = vec![Arc::new(ts_vector)]; + let vector = f.eval(FunctionContext::default(), &args).unwrap(); + assert_eq!(4, vector.len()); + for (i, _t) in times.iter().enumerate() { + let v = vector.get(i); + if i == 1 || i == 3 { + assert_eq!(Value::Null, v); + continue; + } + match v { + Value::Int64(ts) => { + assert_eq!(ts, (*results.get(i).unwrap()).unwrap()); + } + _ => unreachable!(), + } } - builder.finish() } } diff --git a/src/common/time/src/timestamp.rs b/src/common/time/src/timestamp.rs index 147064d674..1036b86a22 100644 --- a/src/common/time/src/timestamp.rs +++ b/src/common/time/src/timestamp.rs @@ -182,7 +182,7 @@ impl Timestamp { /// Split a [Timestamp] into seconds part and nanoseconds part. /// Notice the seconds part of split result is always rounded down to floor. - fn split(&self) -> (i64, u32) { + pub fn split(&self) -> (i64, u32) { let sec_mul = (TimeUnit::Second.factor() / self.unit.factor()) as i64; let nsec_mul = (self.unit.factor() / TimeUnit::Nanosecond.factor()) as i64; diff --git a/src/datatypes/src/value.rs b/src/datatypes/src/value.rs index 64affdea70..024f45f2b3 100644 --- a/src/datatypes/src/value.rs +++ b/src/datatypes/src/value.rs @@ -203,6 +203,22 @@ impl Value { } } + /// Cast Value to Date. Return None if value is not a valid date data type. + pub fn as_date(&self) -> Option { + match self { + Value::Date(t) => Some(*t), + _ => None, + } + } + + /// Cast Value to DateTime. Return None if value is not a valid datetime data type. + pub fn as_datetime(&self) -> Option { + match self { + Value::DateTime(t) => Some(*t), + _ => None, + } + } + /// Cast Value to [Time]. Return None if value is not a valid time data type. pub fn as_time(&self) -> Option