feat!: implement interval type by multiple structs (#4772)

* define structs and methods

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* feat: re-implement interval types in time crate

* feat: use new

* feat: interval value

* feat: query crate interval

* feat: pg and mysql interval

* chore: remove unused imports

* chore: remove commented codes

* feat: make flow compile but may not work

* feat: flow datetime

* test: fix some tests

* test: fix some flow tests(WIP)

* chore: some fix test&docs

* fix: change interval order

* chore: remove unused codes

* chore: fix cilppy

* chore: now signature change

* chore: remove todo

* feat: update error message

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: discord9 <discord9@163.com>
This commit is contained in:
Yingwen
2024-10-14 11:09:03 +08:00
committed by GitHub
parent 856c0280f5
commit 2f2b4b306c
30 changed files with 1190 additions and 1228 deletions

View File

@@ -17,10 +17,11 @@ use std::sync::Arc;
use common_base::BitVec;
use common_decimal::decimal128::{DECIMAL128_DEFAULT_SCALE, DECIMAL128_MAX_PRECISION};
use common_decimal::Decimal128;
use common_time::interval::IntervalUnit;
use common_time::time::Time;
use common_time::timestamp::TimeUnit;
use common_time::{Date, DateTime, Interval, Timestamp};
use common_time::{
Date, DateTime, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp,
};
use datatypes::prelude::{ConcreteDataType, ValueRef};
use datatypes::scalars::ScalarVector;
use datatypes::types::{
@@ -456,13 +457,11 @@ pub fn push_vals(column: &mut Column, origin_count: usize, vector: VectorRef) {
TimeUnit::Microsecond => values.time_microsecond_values.push(val.value()),
TimeUnit::Nanosecond => values.time_nanosecond_values.push(val.value()),
},
Value::Interval(val) => match val.unit() {
IntervalUnit::YearMonth => values.interval_year_month_values.push(val.to_i32()),
IntervalUnit::DayTime => values.interval_day_time_values.push(val.to_i64()),
IntervalUnit::MonthDayNano => values
.interval_month_day_nano_values
.push(convert_i128_to_interval(val.to_i128())),
},
Value::IntervalYearMonth(val) => values.interval_year_month_values.push(val.to_i32()),
Value::IntervalDayTime(val) => values.interval_day_time_values.push(val.to_i64()),
Value::IntervalMonthDayNano(val) => values
.interval_month_day_nano_values
.push(convert_month_day_nano_to_pb(val)),
Value::Decimal128(val) => values.decimal128_values.push(convert_to_pb_decimal128(val)),
Value::List(_) | Value::Duration(_) => unreachable!(),
});
@@ -507,14 +506,12 @@ fn ddl_request_type(request: &DdlRequest) -> &'static str {
}
}
/// Converts an i128 value to google protobuf type [IntervalMonthDayNano].
pub fn convert_i128_to_interval(v: i128) -> v1::IntervalMonthDayNano {
let interval = Interval::from_i128(v);
let (months, days, nanoseconds) = interval.to_month_day_nano();
/// Converts an interval to google protobuf type [IntervalMonthDayNano].
pub fn convert_month_day_nano_to_pb(v: IntervalMonthDayNano) -> v1::IntervalMonthDayNano {
v1::IntervalMonthDayNano {
months,
days,
nanoseconds,
months: v.months,
days: v.days,
nanoseconds: v.nanoseconds,
}
}
@@ -562,11 +559,15 @@ pub fn pb_value_to_value_ref<'a>(
ValueData::TimeMillisecondValue(t) => ValueRef::Time(Time::new_millisecond(*t)),
ValueData::TimeMicrosecondValue(t) => ValueRef::Time(Time::new_microsecond(*t)),
ValueData::TimeNanosecondValue(t) => ValueRef::Time(Time::new_nanosecond(*t)),
ValueData::IntervalYearMonthValue(v) => ValueRef::Interval(Interval::from_i32(*v)),
ValueData::IntervalDayTimeValue(v) => ValueRef::Interval(Interval::from_i64(*v)),
ValueData::IntervalYearMonthValue(v) => {
ValueRef::IntervalYearMonth(IntervalYearMonth::from_i32(*v))
}
ValueData::IntervalDayTimeValue(v) => {
ValueRef::IntervalDayTime(IntervalDayTime::from_i64(*v))
}
ValueData::IntervalMonthDayNanoValue(v) => {
let interval = Interval::from_month_day_nano(v.months, v.days, v.nanoseconds);
ValueRef::Interval(interval)
let interval = IntervalMonthDayNano::new(v.months, v.days, v.nanoseconds);
ValueRef::IntervalMonthDayNano(interval)
}
ValueData::Decimal128Value(v) => {
// get precision and scale from datatype_extension
@@ -657,7 +658,7 @@ pub fn pb_values_to_vector_ref(data_type: &ConcreteDataType, values: Values) ->
IntervalType::MonthDayNano(_) => {
Arc::new(IntervalMonthDayNanoVector::from_iter_values(
values.interval_month_day_nano_values.iter().map(|x| {
Interval::from_month_day_nano(x.months, x.days, x.nanoseconds).to_i128()
IntervalMonthDayNano::new(x.months, x.days, x.nanoseconds).to_i128()
}),
))
}
@@ -802,18 +803,18 @@ pub fn pb_values_to_values(data_type: &ConcreteDataType, values: Values) -> Vec<
ConcreteDataType::Interval(IntervalType::YearMonth(_)) => values
.interval_year_month_values
.into_iter()
.map(|v| Value::Interval(Interval::from_i32(v)))
.map(|v| Value::IntervalYearMonth(IntervalYearMonth::from_i32(v)))
.collect(),
ConcreteDataType::Interval(IntervalType::DayTime(_)) => values
.interval_day_time_values
.into_iter()
.map(|v| Value::Interval(Interval::from_i64(v)))
.map(|v| Value::IntervalDayTime(IntervalDayTime::from_i64(v)))
.collect(),
ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => values
.interval_month_day_nano_values
.into_iter()
.map(|v| {
Value::Interval(Interval::from_month_day_nano(
Value::IntervalMonthDayNano(IntervalMonthDayNano::new(
v.months,
v.days,
v.nanoseconds,
@@ -941,18 +942,16 @@ pub fn to_proto_value(value: Value) -> Option<v1::Value> {
value_data: Some(ValueData::TimeNanosecondValue(v.value())),
},
},
Value::Interval(v) => match v.unit() {
IntervalUnit::YearMonth => v1::Value {
value_data: Some(ValueData::IntervalYearMonthValue(v.to_i32())),
},
IntervalUnit::DayTime => v1::Value {
value_data: Some(ValueData::IntervalDayTimeValue(v.to_i64())),
},
IntervalUnit::MonthDayNano => v1::Value {
value_data: Some(ValueData::IntervalMonthDayNanoValue(
convert_i128_to_interval(v.to_i128()),
)),
},
Value::IntervalYearMonth(v) => v1::Value {
value_data: Some(ValueData::IntervalYearMonthValue(v.to_i32())),
},
Value::IntervalDayTime(v) => v1::Value {
value_data: Some(ValueData::IntervalDayTimeValue(v.to_i64())),
},
Value::IntervalMonthDayNano(v) => v1::Value {
value_data: Some(ValueData::IntervalMonthDayNanoValue(
convert_month_day_nano_to_pb(v),
)),
},
Value::Decimal128(v) => v1::Value {
value_data: Some(ValueData::Decimal128Value(convert_to_pb_decimal128(v))),
@@ -1044,13 +1043,11 @@ pub fn value_to_grpc_value(value: Value) -> GrpcValue {
TimeUnit::Microsecond => ValueData::TimeMicrosecondValue(v.value()),
TimeUnit::Nanosecond => ValueData::TimeNanosecondValue(v.value()),
}),
Value::Interval(v) => Some(match v.unit() {
IntervalUnit::YearMonth => ValueData::IntervalYearMonthValue(v.to_i32()),
IntervalUnit::DayTime => ValueData::IntervalDayTimeValue(v.to_i64()),
IntervalUnit::MonthDayNano => {
ValueData::IntervalMonthDayNanoValue(convert_i128_to_interval(v.to_i128()))
}
}),
Value::IntervalYearMonth(v) => Some(ValueData::IntervalYearMonthValue(v.to_i32())),
Value::IntervalDayTime(v) => Some(ValueData::IntervalDayTimeValue(v.to_i64())),
Value::IntervalMonthDayNano(v) => Some(ValueData::IntervalMonthDayNanoValue(
convert_month_day_nano_to_pb(v),
)),
Value::Decimal128(v) => Some(ValueData::Decimal128Value(convert_to_pb_decimal128(v))),
Value::List(_) | Value::Duration(_) => unreachable!(),
},
@@ -1061,6 +1058,7 @@ pub fn value_to_grpc_value(value: Value) -> GrpcValue {
mod tests {
use std::sync::Arc;
use common_time::interval::IntervalUnit;
use datatypes::types::{
Int32Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType,
TimeMillisecondType, TimeSecondType, TimestampMillisecondType, TimestampSecondType,
@@ -1506,11 +1504,11 @@ mod tests {
#[test]
fn test_convert_i128_to_interval() {
let i128_val = 3000;
let interval = convert_i128_to_interval(i128_val);
let i128_val = 3;
let interval = convert_month_day_nano_to_pb(IntervalMonthDayNano::from_i128(i128_val));
assert_eq!(interval.months, 0);
assert_eq!(interval.days, 0);
assert_eq!(interval.nanoseconds, 3000);
assert_eq!(interval.nanoseconds, 3);
}
#[test]
@@ -1590,9 +1588,9 @@ mod tests {
},
);
let expect = vec![
Value::Interval(Interval::from_year_month(1_i32)),
Value::Interval(Interval::from_year_month(2_i32)),
Value::Interval(Interval::from_year_month(3_i32)),
Value::IntervalYearMonth(IntervalYearMonth::new(1_i32)),
Value::IntervalYearMonth(IntervalYearMonth::new(2_i32)),
Value::IntervalYearMonth(IntervalYearMonth::new(3_i32)),
];
assert_eq!(expect, actual);
@@ -1605,9 +1603,9 @@ mod tests {
},
);
let expect = vec![
Value::Interval(Interval::from_i64(1_i64)),
Value::Interval(Interval::from_i64(2_i64)),
Value::Interval(Interval::from_i64(3_i64)),
Value::IntervalDayTime(IntervalDayTime::from_i64(1_i64)),
Value::IntervalDayTime(IntervalDayTime::from_i64(2_i64)),
Value::IntervalDayTime(IntervalDayTime::from_i64(3_i64)),
];
assert_eq!(expect, actual);
@@ -1636,9 +1634,9 @@ mod tests {
},
);
let expect = vec![
Value::Interval(Interval::from_month_day_nano(1, 2, 3)),
Value::Interval(Interval::from_month_day_nano(5, 6, 7)),
Value::Interval(Interval::from_month_day_nano(9, 10, 11)),
Value::IntervalMonthDayNano(IntervalMonthDayNano::new(1, 2, 3)),
Value::IntervalMonthDayNano(IntervalMonthDayNano::new(5, 6, 7)),
Value::IntervalMonthDayNano(IntervalMonthDayNano::new(9, 10, 11)),
];
assert_eq!(expect, actual);
}

View File

@@ -14,18 +14,19 @@
use std::fmt;
use common_query::error::{InvalidFuncArgsSnafu, Result, UnsupportedInputDataTypeSnafu};
use common_query::error::{ArrowComputeSnafu, IntoVectorSnafu, InvalidFuncArgsSnafu, Result};
use common_query::prelude::Signature;
use datatypes::data_type::DataType;
use datatypes::arrow::compute::kernels::numeric;
use datatypes::prelude::ConcreteDataType;
use datatypes::value::ValueRef;
use datatypes::vectors::VectorRef;
use snafu::ensure;
use datatypes::vectors::{Helper, VectorRef};
use snafu::{ensure, ResultExt};
use crate::function::{Function, FunctionContext};
use crate::helper;
/// A function adds an interval value to Timestamp, Date or DateTime, and return the result.
/// A function adds an interval value to Timestamp, Date, and return the result.
/// The implementation of datetime type is based on Date64 which is incorrect so this function
/// doesn't support the datetime type.
#[derive(Clone, Debug, Default)]
pub struct DateAddFunction;
@@ -44,7 +45,6 @@ impl Function for DateAddFunction {
helper::one_of_sigs2(
vec![
ConcreteDataType::date_datatype(),
ConcreteDataType::datetime_datatype(),
ConcreteDataType::timestamp_second_datatype(),
ConcreteDataType::timestamp_millisecond_datatype(),
ConcreteDataType::timestamp_microsecond_datatype(),
@@ -69,64 +69,14 @@ impl Function for DateAddFunction {
}
);
let left = &columns[0];
let right = &columns[1];
let left = columns[0].to_arrow_array();
let right = columns[1].to_arrow_array();
let size = left.len();
let left_datatype = columns[0].data_type();
match left_datatype {
ConcreteDataType::Timestamp(_) => {
let mut result = left_datatype.create_mutable_vector(size);
for i in 0..size {
let ts = left.get(i).as_timestamp();
let interval = right.get(i).as_interval();
let new_ts = match (ts, interval) {
(Some(ts), Some(interval)) => ts.add_interval(interval),
_ => ts,
};
result.push_value_ref(ValueRef::from(new_ts));
}
Ok(result.to_vector())
}
ConcreteDataType::Date(_) => {
let mut result = left_datatype.create_mutable_vector(size);
for i in 0..size {
let date = left.get(i).as_date();
let interval = right.get(i).as_interval();
let new_date = match (date, interval) {
(Some(date), Some(interval)) => date.add_interval(interval),
_ => date,
};
result.push_value_ref(ValueRef::from(new_date));
}
Ok(result.to_vector())
}
ConcreteDataType::DateTime(_) => {
let mut result = left_datatype.create_mutable_vector(size);
for i in 0..size {
let datetime = left.get(i).as_datetime();
let interval = right.get(i).as_interval();
let new_datetime = match (datetime, interval) {
(Some(datetime), Some(interval)) => datetime.add_interval(interval),
_ => datetime,
};
result.push_value_ref(ValueRef::from(new_datetime));
}
Ok(result.to_vector())
}
_ => UnsupportedInputDataTypeSnafu {
function: NAME,
datatypes: columns.iter().map(|c| c.data_type()).collect::<Vec<_>>(),
}
.fail(),
}
let result = numeric::add(&left, &right).context(ArrowComputeSnafu)?;
let arrow_type = result.data_type().clone();
Helper::try_into_vector(result).context(IntoVectorSnafu {
data_type: arrow_type,
})
}
}
@@ -144,8 +94,7 @@ mod tests {
use datatypes::prelude::ConcreteDataType;
use datatypes::value::Value;
use datatypes::vectors::{
DateTimeVector, DateVector, IntervalDayTimeVector, IntervalYearMonthVector,
TimestampSecondVector,
DateVector, IntervalDayTimeVector, IntervalYearMonthVector, TimestampSecondVector,
};
use super::{DateAddFunction, *};
@@ -168,16 +117,15 @@ mod tests {
ConcreteDataType::date_datatype(),
f.return_type(&[ConcreteDataType::date_datatype()]).unwrap()
);
assert_eq!(
ConcreteDataType::datetime_datatype(),
f.return_type(&[ConcreteDataType::datetime_datatype()])
.unwrap()
);
assert!(matches!(f.signature(),
assert!(
matches!(f.signature(),
Signature {
type_signature: TypeSignature::OneOf(sigs),
volatility: Volatility::Immutable
} if sigs.len() == 18));
} if sigs.len() == 15),
"{:?}",
f.signature()
);
}
#[test]
@@ -243,36 +191,4 @@ mod tests {
}
}
}
#[test]
fn test_datetime_date_add() {
let f = DateAddFunction;
let dates = vec![Some(123), None, Some(42), None];
// Intervals in months
let intervals = vec![1, 2, 3, 1];
let results = [Some(2678400123), None, Some(7776000042), None];
let date_vector = DateTimeVector::from(dates.clone());
let interval_vector = IntervalYearMonthVector::from_vec(intervals);
let args: Vec<VectorRef> = vec![Arc::new(date_vector), Arc::new(interval_vector)];
let vector = f.eval(FunctionContext::default(), &args).unwrap();
assert_eq!(4, vector.len());
for (i, _t) in dates.iter().enumerate() {
let v = vector.get(i);
let result = results.get(i).unwrap();
if result.is_none() {
assert_eq!(Value::Null, v);
continue;
}
match v {
Value::DateTime(date) => {
assert_eq!(date.val(), result.unwrap());
}
_ => unreachable!(),
}
}
}
}

View File

@@ -14,18 +14,19 @@
use std::fmt;
use common_query::error::{InvalidFuncArgsSnafu, Result, UnsupportedInputDataTypeSnafu};
use common_query::error::{ArrowComputeSnafu, IntoVectorSnafu, InvalidFuncArgsSnafu, Result};
use common_query::prelude::Signature;
use datatypes::data_type::DataType;
use datatypes::arrow::compute::kernels::numeric;
use datatypes::prelude::ConcreteDataType;
use datatypes::value::ValueRef;
use datatypes::vectors::VectorRef;
use snafu::ensure;
use datatypes::vectors::{Helper, VectorRef};
use snafu::{ensure, ResultExt};
use crate::function::{Function, FunctionContext};
use crate::helper;
/// A function subtracts an interval value to Timestamp, Date or DateTime, and return the result.
/// A function subtracts an interval value to Timestamp, Date, and return the result.
/// The implementation of datetime type is based on Date64 which is incorrect so this function
/// doesn't support the datetime type.
#[derive(Clone, Debug, Default)]
pub struct DateSubFunction;
@@ -44,7 +45,6 @@ impl Function for DateSubFunction {
helper::one_of_sigs2(
vec![
ConcreteDataType::date_datatype(),
ConcreteDataType::datetime_datatype(),
ConcreteDataType::timestamp_second_datatype(),
ConcreteDataType::timestamp_millisecond_datatype(),
ConcreteDataType::timestamp_microsecond_datatype(),
@@ -69,65 +69,14 @@ impl Function for DateSubFunction {
}
);
let left = &columns[0];
let right = &columns[1];
let left = columns[0].to_arrow_array();
let right = columns[1].to_arrow_array();
let size = left.len();
let left_datatype = columns[0].data_type();
match left_datatype {
ConcreteDataType::Timestamp(_) => {
let mut result = left_datatype.create_mutable_vector(size);
for i in 0..size {
let ts = left.get(i).as_timestamp();
let interval = right.get(i).as_interval();
let new_ts = match (ts, interval) {
(Some(ts), Some(interval)) => ts.sub_interval(interval),
_ => ts,
};
result.push_value_ref(ValueRef::from(new_ts));
}
Ok(result.to_vector())
}
ConcreteDataType::Date(_) => {
let mut result = left_datatype.create_mutable_vector(size);
for i in 0..size {
let date = left.get(i).as_date();
let interval = right.get(i).as_interval();
let new_date = match (date, interval) {
(Some(date), Some(interval)) => date.sub_interval(interval),
_ => date,
};
result.push_value_ref(ValueRef::from(new_date));
}
Ok(result.to_vector())
}
ConcreteDataType::DateTime(_) => {
let mut result = left_datatype.create_mutable_vector(size);
for i in 0..size {
let datetime = left.get(i).as_datetime();
let interval = right.get(i).as_interval();
let new_datetime = match (datetime, interval) {
(Some(datetime), Some(interval)) => datetime.sub_interval(interval),
_ => datetime,
};
result.push_value_ref(ValueRef::from(new_datetime));
}
Ok(result.to_vector())
}
_ => UnsupportedInputDataTypeSnafu {
function: NAME,
datatypes: columns.iter().map(|c| c.data_type()).collect::<Vec<_>>(),
}
.fail(),
}
let result = numeric::sub(&left, &right).context(ArrowComputeSnafu)?;
let arrow_type = result.data_type().clone();
Helper::try_into_vector(result).context(IntoVectorSnafu {
data_type: arrow_type,
})
}
}
@@ -145,8 +94,7 @@ mod tests {
use datatypes::prelude::ConcreteDataType;
use datatypes::value::Value;
use datatypes::vectors::{
DateTimeVector, DateVector, IntervalDayTimeVector, IntervalYearMonthVector,
TimestampSecondVector,
DateVector, IntervalDayTimeVector, IntervalYearMonthVector, TimestampSecondVector,
};
use super::{DateSubFunction, *};
@@ -174,11 +122,15 @@ mod tests {
f.return_type(&[ConcreteDataType::datetime_datatype()])
.unwrap()
);
assert!(matches!(f.signature(),
assert!(
matches!(f.signature(),
Signature {
type_signature: TypeSignature::OneOf(sigs),
volatility: Volatility::Immutable
} if sigs.len() == 18));
} if sigs.len() == 15),
"{:?}",
f.signature()
);
}
#[test]
@@ -250,42 +202,4 @@ mod tests {
}
}
}
#[test]
fn test_datetime_date_sub() {
let f = DateSubFunction;
let millis_per_month = 3600 * 24 * 30 * 1000;
let dates = vec![
Some(123 * millis_per_month),
None,
Some(42 * millis_per_month),
None,
];
// Intervals in months
let intervals = vec![1, 2, 3, 1];
let results = [Some(316137600000), None, Some(100915200000), None];
let date_vector = DateTimeVector::from(dates.clone());
let interval_vector = IntervalYearMonthVector::from_vec(intervals);
let args: Vec<VectorRef> = vec![Arc::new(date_vector), Arc::new(interval_vector)];
let vector = f.eval(FunctionContext::default(), &args).unwrap();
assert_eq!(4, vector.len());
for (i, _t) in dates.iter().enumerate() {
let v = vector.get(i);
let result = results.get(i).unwrap();
if result.is_none() {
assert_eq!(Value::Null, v);
continue;
}
match v {
Value::DateTime(date) => {
assert_eq!(date.val(), result.unwrap());
}
_ => unreachable!(),
}
}
}
}

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::helper::{convert_i128_to_interval, convert_to_pb_decimal128};
use api::helper::{convert_month_day_nano_to_pb, convert_to_pb_decimal128};
use api::v1::column::Values;
use common_base::BitVec;
use datatypes::types::{IntervalType, TimeType, TimestampType, WrapperType};
@@ -211,7 +211,7 @@ pub fn values(arrays: &[VectorRef]) -> Result<Values> {
ConcreteDataType::Interval(IntervalType::MonthDayNano(_)),
IntervalMonthDayNanoVector,
interval_month_day_nano_values,
|x| { convert_i128_to_interval(x.into_native()) }
|x| { convert_month_day_nano_to_pb(x) }
),
(
ConcreteDataType::Decimal128(_),

View File

@@ -14,13 +14,13 @@
use std::fmt::{Display, Formatter, Write};
use chrono::{Datelike, Days, LocalResult, Months, NaiveDate, NaiveTime, TimeZone};
use chrono::{Datelike, Days, LocalResult, Months, NaiveDate, NaiveTime, TimeDelta, TimeZone};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use snafu::ResultExt;
use crate::error::{InvalidDateStrSnafu, ParseDateStrSnafu, Result};
use crate::interval::Interval;
use crate::interval::{IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
use crate::timezone::get_timezone;
use crate::util::datetime_to_utc;
use crate::Timezone;
@@ -134,29 +134,64 @@ impl Date {
(self.0 as i64) * 24 * 3600
}
/// Adds given Interval to the current date.
/// Returns None if the resulting date would be out of range.
pub fn add_interval(&self, interval: Interval) -> Option<Date> {
// FIXME(yingwen): remove add/sub intervals later
/// Adds given [IntervalYearMonth] to the current date.
pub fn add_year_month(&self, interval: IntervalYearMonth) -> Option<Date> {
let naive_date = self.to_chrono_date()?;
let (months, days, _) = interval.to_month_day_nano();
naive_date
.checked_add_months(Months::new(months as u32))?
.checked_add_days(Days::new(days as u64))
.checked_add_months(Months::new(interval.months as u32))
.map(Into::into)
}
/// Subtracts given Interval to the current date.
/// Returns None if the resulting date would be out of range.
pub fn sub_interval(&self, interval: Interval) -> Option<Date> {
/// Adds given [IntervalDayTime] to the current date.
pub fn add_day_time(&self, interval: IntervalDayTime) -> Option<Date> {
let naive_date = self.to_chrono_date()?;
let (months, days, _) = interval.to_month_day_nano();
naive_date
.checked_add_days(Days::new(interval.days as u64))?
.checked_add_signed(TimeDelta::milliseconds(interval.milliseconds as i64))
.map(Into::into)
}
/// Adds given [IntervalMonthDayNano] to the current date.
pub fn add_month_day_nano(&self, interval: IntervalMonthDayNano) -> Option<Date> {
let naive_date = self.to_chrono_date()?;
naive_date
.checked_sub_months(Months::new(months as u32))?
.checked_sub_days(Days::new(days as u64))
.checked_add_months(Months::new(interval.months as u32))?
.checked_add_days(Days::new(interval.days as u64))?
.checked_add_signed(TimeDelta::nanoseconds(interval.nanoseconds))
.map(Into::into)
}
/// Subtracts given [IntervalYearMonth] to the current date.
pub fn sub_year_month(&self, interval: IntervalYearMonth) -> Option<Date> {
let naive_date = self.to_chrono_date()?;
naive_date
.checked_sub_months(Months::new(interval.months as u32))
.map(Into::into)
}
/// Subtracts given [IntervalDayTime] to the current date.
pub fn sub_day_time(&self, interval: IntervalDayTime) -> Option<Date> {
let naive_date = self.to_chrono_date()?;
naive_date
.checked_sub_days(Days::new(interval.days as u64))?
.checked_sub_signed(TimeDelta::milliseconds(interval.milliseconds as i64))
.map(Into::into)
}
/// Subtracts given [IntervalMonthDayNano] to the current date.
pub fn sub_month_day_nano(&self, interval: IntervalMonthDayNano) -> Option<Date> {
let naive_date = self.to_chrono_date()?;
naive_date
.checked_sub_months(Months::new(interval.months as u32))?
.checked_sub_days(Days::new(interval.days as u64))?
.checked_sub_signed(TimeDelta::nanoseconds(interval.nanoseconds))
.map(Into::into)
}
@@ -246,12 +281,12 @@ mod tests {
fn test_add_sub_interval() {
let date = Date::new(1000);
let interval = Interval::from_year_month(3);
let interval = IntervalYearMonth::new(3);
let new_date = date.add_interval(interval).unwrap();
let new_date = date.add_year_month(interval).unwrap();
assert_eq!(new_date.val(), 1091);
assert_eq!(date, new_date.sub_interval(interval).unwrap());
assert_eq!(date, new_date.sub_year_month(interval).unwrap());
}
#[test]

View File

@@ -13,16 +13,18 @@
// limitations under the License.
use std::fmt::{Display, Formatter, Write};
use std::time::Duration;
use chrono::{Days, LocalResult, Months, NaiveDateTime, TimeZone as ChronoTimeZone, Utc};
use chrono::{
Days, LocalResult, Months, NaiveDateTime, TimeDelta, TimeZone as ChronoTimeZone, Utc,
};
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use crate::error::{InvalidDateStrSnafu, Result};
use crate::interval::{IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
use crate::timezone::{get_timezone, Timezone};
use crate::util::{datetime_to_utc, format_utc_datetime};
use crate::{Date, Interval};
use crate::Date;
const DATETIME_FORMAT: &str = "%F %H:%M:%S%.f";
const DATETIME_FORMAT_WITH_TZ: &str = "%F %H:%M:%S%.f%z";
@@ -160,32 +162,66 @@ impl DateTime {
None => Utc.from_utc_datetime(&v).naive_local(),
})
}
/// Adds given Interval to the current datetime.
/// Returns None if the resulting datetime would be out of range.
pub fn add_interval(&self, interval: Interval) -> Option<Self> {
// FIXME(yingwen): remove add/sub intervals later
/// Adds given [IntervalYearMonth] to the current datetime.
pub fn add_year_month(&self, interval: IntervalYearMonth) -> Option<Self> {
let naive_datetime = self.to_chrono_datetime()?;
let (months, days, nsecs) = interval.to_month_day_nano();
let naive_datetime = naive_datetime
.checked_add_months(Months::new(months as u32))?
.checked_add_days(Days::new(days as u64))?
+ Duration::from_nanos(nsecs as u64);
Some(naive_datetime.into())
naive_datetime
.checked_add_months(Months::new(interval.months as u32))
.map(Into::into)
}
/// Subtracts given Interval to the current datetime.
/// Returns None if the resulting datetime would be out of range.
pub fn sub_interval(&self, interval: Interval) -> Option<Self> {
/// Adds given [IntervalDayTime] to the current datetime.
pub fn add_day_time(&self, interval: IntervalDayTime) -> Option<Self> {
let naive_datetime = self.to_chrono_datetime()?;
let (months, days, nsecs) = interval.to_month_day_nano();
let naive_datetime = naive_datetime
.checked_sub_months(Months::new(months as u32))?
.checked_sub_days(Days::new(days as u64))?
- Duration::from_nanos(nsecs as u64);
naive_datetime
.checked_add_days(Days::new(interval.days as u64))?
.checked_add_signed(TimeDelta::milliseconds(interval.milliseconds as i64))
.map(Into::into)
}
Some(naive_datetime.into())
/// Adds given [IntervalMonthDayNano] to the current datetime.
pub fn add_month_day_nano(&self, interval: IntervalMonthDayNano) -> Option<Self> {
let naive_datetime = self.to_chrono_datetime()?;
naive_datetime
.checked_add_months(Months::new(interval.months as u32))?
.checked_add_days(Days::new(interval.days as u64))?
.checked_add_signed(TimeDelta::nanoseconds(interval.nanoseconds))
.map(Into::into)
}
/// Subtracts given [IntervalYearMonth] to the current datetime.
pub fn sub_year_month(&self, interval: IntervalYearMonth) -> Option<Self> {
let naive_datetime = self.to_chrono_datetime()?;
naive_datetime
.checked_sub_months(Months::new(interval.months as u32))
.map(Into::into)
}
/// Subtracts given [IntervalDayTime] to the current datetime.
pub fn sub_day_time(&self, interval: IntervalDayTime) -> Option<Self> {
let naive_datetime = self.to_chrono_datetime()?;
naive_datetime
.checked_sub_days(Days::new(interval.days as u64))?
.checked_sub_signed(TimeDelta::milliseconds(interval.milliseconds as i64))
.map(Into::into)
}
/// Subtracts given [IntervalMonthDayNano] to the current datetime.
pub fn sub_month_day_nano(&self, interval: IntervalMonthDayNano) -> Option<Self> {
let naive_datetime = self.to_chrono_datetime()?;
naive_datetime
.checked_sub_months(Months::new(interval.months as u32))?
.checked_sub_days(Days::new(interval.days as u64))?
.checked_sub_signed(TimeDelta::nanoseconds(interval.nanoseconds))
.map(Into::into)
}
/// Convert to [common_time::date].
@@ -231,12 +267,12 @@ mod tests {
fn test_add_sub_interval() {
let datetime = DateTime::new(1000);
let interval = Interval::from_day_time(1, 200);
let interval = IntervalDayTime::new(1, 200);
let new_datetime = datetime.add_interval(interval).unwrap();
let new_datetime = datetime.add_day_time(interval).unwrap();
assert_eq!(new_datetime.val(), 1000 + 3600 * 24 * 1000 + 200);
assert_eq!(datetime, new_datetime.sub_interval(interval).unwrap());
assert_eq!(datetime, new_datetime.sub_day_time(interval).unwrap());
}
#[test]

View File

@@ -12,18 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::cmp::Ordering;
use std::default::Default;
use std::fmt::{self, Display, Formatter, Write};
use std::hash::{Hash, Hasher};
use std::hash::Hash;
use arrow::datatypes::IntervalUnit as ArrowIntervalUnit;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use snafu::ResultExt;
use crate::duration::Duration;
use crate::error::{Result, TimestampOverflowSnafu};
#[derive(
Debug, Default, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize,
@@ -61,268 +53,269 @@ impl From<ArrowIntervalUnit> for IntervalUnit {
}
}
/// Interval Type represents a period of time.
/// It is composed of months, days and nanoseconds.
/// 3 kinds of interval are supported: year-month, day-time and
/// month-day-nano, which will be stored in the following format.
/// Interval data format:
/// | months | days | nsecs |
/// | 4bytes | 4bytes | 8bytes |
#[derive(Debug, Clone, Default, Copy, Serialize, Deserialize)]
pub struct Interval {
months: i32,
days: i32,
nsecs: i64,
unit: IntervalUnit,
// The `Value` type requires Serialize, Deserialize.
#[derive(
Debug, Default, Copy, Clone, Eq, PartialEq, Hash, Ord, PartialOrd, Serialize, Deserialize,
)]
#[repr(C)]
pub struct IntervalYearMonth {
/// Number of months
pub months: i32,
}
// Nanosecond convert to other time unit
pub const NANOS_PER_SEC: i64 = 1_000_000_000;
pub const NANOS_PER_MILLI: i64 = 1_000_000;
pub const NANOS_PER_MICRO: i64 = 1_000;
pub const NANOS_PER_HOUR: i64 = 60 * 60 * NANOS_PER_SEC;
pub const NANOS_PER_DAY: i64 = 24 * NANOS_PER_HOUR;
pub const NANOS_PER_MONTH: i64 = 30 * NANOS_PER_DAY;
pub const DAYS_PER_MONTH: i64 = 30;
impl Interval {
/// Creates a new interval from months, days and nanoseconds.
/// Precision is nanosecond.
pub fn from_month_day_nano(months: i32, days: i32, nsecs: i64) -> Self {
Interval {
months,
days,
nsecs,
unit: IntervalUnit::MonthDayNano,
}
}
/// Creates a new interval from months.
pub fn from_year_month(months: i32) -> Self {
Interval {
months,
days: 0,
nsecs: 0,
unit: IntervalUnit::YearMonth,
}
}
/// Creates a new interval from days and milliseconds.
pub fn from_day_time(days: i32, millis: i32) -> Self {
Interval {
months: 0,
days,
nsecs: (millis as i64) * NANOS_PER_MILLI,
unit: IntervalUnit::DayTime,
}
}
pub fn to_duration(&self) -> Result<Duration> {
Ok(Duration::new_nanosecond(
self.to_nanosecond()
.try_into()
.context(TimestampOverflowSnafu)?,
))
}
/// Return a tuple(months, days, nanoseconds) from the interval.
pub fn to_month_day_nano(&self) -> (i32, i32, i64) {
(self.months, self.days, self.nsecs)
}
/// Converts the interval to nanoseconds.
pub fn to_nanosecond(&self) -> i128 {
let days = (self.days as i64) + DAYS_PER_MONTH * (self.months as i64);
(self.nsecs as i128) + (NANOS_PER_DAY as i128) * (days as i128)
}
/// Smallest interval value.
pub const MIN: Self = Self {
months: i32::MIN,
days: i32::MIN,
nsecs: i64::MIN,
unit: IntervalUnit::MonthDayNano,
};
/// Largest interval value.
pub const MAX: Self = Self {
months: i32::MAX,
days: i32::MAX,
nsecs: i64::MAX,
unit: IntervalUnit::MonthDayNano,
};
/// Returns the justified interval.
/// allows you to adjust the interval of 30-day as one month and the interval of 24-hour as one day
pub fn justified_interval(&self) -> Self {
let mut result = *self;
let extra_months_d = self.days as i64 / DAYS_PER_MONTH;
let extra_months_nsecs = self.nsecs / NANOS_PER_MONTH;
result.days -= (extra_months_d * DAYS_PER_MONTH) as i32;
result.nsecs -= extra_months_nsecs * NANOS_PER_MONTH;
let extra_days = self.nsecs / NANOS_PER_DAY;
result.nsecs -= extra_days * NANOS_PER_DAY;
result.months += extra_months_d as i32 + extra_months_nsecs as i32;
result.days += extra_days as i32;
result
}
/// Convert Interval to nanoseconds,
/// to check whether Interval is positive
pub fn is_positive(&self) -> bool {
self.to_nanosecond() > 0
}
/// is_zero
pub fn is_zero(&self) -> bool {
self.months == 0 && self.days == 0 && self.nsecs == 0
}
/// get unit
pub fn unit(&self) -> IntervalUnit {
self.unit
}
/// Multiple Interval by an integer with overflow check.
/// Returns justified Interval, or `None` if overflow occurred.
pub fn checked_mul_int<I>(&self, rhs: I) -> Option<Self>
where
I: TryInto<i32>,
{
let rhs = rhs.try_into().ok()?;
let months = self.months.checked_mul(rhs)?;
let days = self.days.checked_mul(rhs)?;
let nsecs = self.nsecs.checked_mul(rhs as i64)?;
Some(
Self {
months,
days,
nsecs,
unit: self.unit,
}
.justified_interval(),
)
}
/// Convert Interval to ISO 8601 string
pub fn to_iso8601_string(self) -> String {
IntervalFormat::from(self).to_iso8601_string()
}
/// Convert Interval to postgres verbose string
pub fn to_postgres_string(self) -> String {
IntervalFormat::from(self).to_postgres_string()
}
/// Convert Interval to sql_standard string
pub fn to_sql_standard_string(self) -> String {
IntervalFormat::from(self).to_sql_standard_string()
}
/// Interval Type and i128 [IntervalUnit::MonthDayNano] Convert
/// v consists of months(i32) | days(i32) | nsecs(i64)
pub fn from_i128(v: i128) -> Self {
Interval {
nsecs: v as i64,
days: (v >> 64) as i32,
months: (v >> 96) as i32,
unit: IntervalUnit::MonthDayNano,
}
}
/// `Interval` Type and i64 [IntervalUnit::DayTime] Convert
/// v consists of days(i32) | milliseconds(i32)
pub fn from_i64(v: i64) -> Self {
Interval {
nsecs: ((v as i32) as i64) * NANOS_PER_MILLI,
days: (v >> 32) as i32,
months: 0,
unit: IntervalUnit::DayTime,
}
}
/// `Interval` Type and i32 [IntervalUnit::YearMonth] Convert
/// v consists of months(i32)
pub fn from_i32(v: i32) -> Self {
Interval {
nsecs: 0,
days: 0,
months: v,
unit: IntervalUnit::YearMonth,
}
}
pub fn to_i128(&self) -> i128 {
// 128 96 64 0
// +-------+-------+-------+-------+-------+-------+-------+-------+
// | months | days | nanoseconds |
// +-------+-------+-------+-------+-------+-------+-------+-------+
let months = (self.months as u128 & u32::MAX as u128) << 96;
let days = (self.days as u128 & u32::MAX as u128) << 64;
let nsecs = self.nsecs as u128 & u64::MAX as u128;
(months | days | nsecs) as i128
}
pub fn to_i64(&self) -> i64 {
// 64 32 0
// +-------+-------+-------+-------+-------+-------+-------+-------+
// | days | milliseconds |
// +-------+-------+-------+-------+-------+-------+-------+-------+
let days = (self.days as u64 & u32::MAX as u64) << 32;
let milliseconds = (self.nsecs / NANOS_PER_MILLI) as u64 & u32::MAX as u64;
(days | milliseconds) as i64
impl IntervalYearMonth {
pub fn new(months: i32) -> Self {
Self { months }
}
pub fn to_i32(&self) -> i32 {
self.months
}
pub fn from_i32(months: i32) -> Self {
Self { months }
}
pub fn negative(&self) -> Self {
Self {
months: -self.months,
days: -self.days,
nsecs: -self.nsecs,
unit: self.unit,
Self::new(-self.months)
}
pub fn to_iso8601_string(&self) -> String {
IntervalFormat::from(*self).to_iso8601_string()
}
}
impl From<IntervalYearMonth> for IntervalFormat {
fn from(interval: IntervalYearMonth) -> Self {
IntervalFormat {
years: interval.months / 12,
months: interval.months % 12,
..Default::default()
}
}
}
impl From<i128> for Interval {
impl From<i32> for IntervalYearMonth {
fn from(v: i32) -> Self {
Self::from_i32(v)
}
}
impl From<IntervalYearMonth> for i32 {
fn from(v: IntervalYearMonth) -> Self {
v.to_i32()
}
}
impl From<IntervalYearMonth> for serde_json::Value {
fn from(v: IntervalYearMonth) -> Self {
serde_json::Value::from(v.to_i32())
}
}
#[derive(
Debug, Default, Copy, Clone, Eq, PartialEq, Hash, Ord, PartialOrd, Serialize, Deserialize,
)]
#[repr(C)]
pub struct IntervalDayTime {
/// Number of days
pub days: i32,
/// Number of milliseconds
pub milliseconds: i32,
}
impl IntervalDayTime {
/// The additive identity i.e. `0`.
pub const ZERO: Self = Self::new(0, 0);
/// The multiplicative inverse, i.e. `-1`.
pub const MINUS_ONE: Self = Self::new(-1, -1);
/// The maximum value that can be represented
pub const MAX: Self = Self::new(i32::MAX, i32::MAX);
/// The minimum value that can be represented
pub const MIN: Self = Self::new(i32::MIN, i32::MIN);
pub const fn new(days: i32, milliseconds: i32) -> Self {
Self { days, milliseconds }
}
pub fn to_i64(&self) -> i64 {
let d = (self.days as u64 & u32::MAX as u64) << 32;
let m = self.milliseconds as u64 & u32::MAX as u64;
(d | m) as i64
}
pub fn from_i64(value: i64) -> Self {
let days = (value >> 32) as i32;
let milliseconds = value as i32;
Self { days, milliseconds }
}
pub fn negative(&self) -> Self {
Self::new(-self.days, -self.milliseconds)
}
pub fn to_iso8601_string(&self) -> String {
IntervalFormat::from(*self).to_iso8601_string()
}
pub fn as_millis(&self) -> i64 {
self.days as i64 * MS_PER_DAY + self.milliseconds as i64
}
}
impl From<i64> for IntervalDayTime {
fn from(v: i64) -> Self {
Self::from_i64(v)
}
}
impl From<IntervalDayTime> for i64 {
fn from(v: IntervalDayTime) -> Self {
v.to_i64()
}
}
impl From<IntervalDayTime> for serde_json::Value {
fn from(v: IntervalDayTime) -> Self {
serde_json::Value::from(v.to_i64())
}
}
// Millisecond convert to other time unit
pub const MS_PER_SEC: i64 = 1_000;
pub const MS_PER_MINUTE: i64 = 60 * MS_PER_SEC;
pub const MS_PER_HOUR: i64 = 60 * MS_PER_MINUTE;
pub const MS_PER_DAY: i64 = 24 * MS_PER_HOUR;
pub const NANOS_PER_MILLI: i64 = 1_000_000;
impl From<IntervalDayTime> for IntervalFormat {
fn from(interval: IntervalDayTime) -> Self {
IntervalFormat {
days: interval.days,
hours: interval.milliseconds as i64 / MS_PER_HOUR,
minutes: (interval.milliseconds as i64 % MS_PER_HOUR) / MS_PER_MINUTE,
seconds: (interval.milliseconds as i64 % MS_PER_MINUTE) / MS_PER_SEC,
microseconds: (interval.milliseconds as i64 % MS_PER_SEC) * MS_PER_SEC,
..Default::default()
}
}
}
#[derive(
Debug, Default, Copy, Clone, Eq, PartialEq, Hash, Ord, PartialOrd, Serialize, Deserialize,
)]
#[repr(C)]
pub struct IntervalMonthDayNano {
/// Number of months
pub months: i32,
/// Number of days
pub days: i32,
/// Number of nanoseconds
pub nanoseconds: i64,
}
impl IntervalMonthDayNano {
/// The additive identity i.e. `0`.
pub const ZERO: Self = Self::new(0, 0, 0);
/// The multiplicative inverse, i.e. `-1`.
pub const MINUS_ONE: Self = Self::new(-1, -1, -1);
/// The maximum value that can be represented
pub const MAX: Self = Self::new(i32::MAX, i32::MAX, i64::MAX);
/// The minimum value that can be represented
pub const MIN: Self = Self::new(i32::MIN, i32::MIN, i64::MIN);
pub const fn new(months: i32, days: i32, nanoseconds: i64) -> Self {
Self {
months,
days,
nanoseconds,
}
}
pub fn to_i128(&self) -> i128 {
let m = (self.months as u128 & u32::MAX as u128) << 96;
let d = (self.days as u128 & u32::MAX as u128) << 64;
let n = self.nanoseconds as u128 & u64::MAX as u128;
(m | d | n) as i128
}
pub fn from_i128(value: i128) -> Self {
let months = (value >> 96) as i32;
let days = (value >> 64) as i32;
let nanoseconds = value as i64;
Self {
months,
days,
nanoseconds,
}
}
pub fn negative(&self) -> Self {
Self::new(-self.months, -self.days, -self.nanoseconds)
}
pub fn to_iso8601_string(&self) -> String {
IntervalFormat::from(*self).to_iso8601_string()
}
}
impl From<i128> for IntervalMonthDayNano {
fn from(v: i128) -> Self {
Self::from_i128(v)
}
}
impl From<Interval> for i128 {
fn from(v: Interval) -> Self {
impl From<IntervalMonthDayNano> for i128 {
fn from(v: IntervalMonthDayNano) -> Self {
v.to_i128()
}
}
impl From<Interval> for serde_json::Value {
fn from(v: Interval) -> Self {
Value::String(v.to_string())
impl From<IntervalMonthDayNano> for serde_json::Value {
fn from(v: IntervalMonthDayNano) -> Self {
serde_json::Value::from(v.to_i128().to_string())
}
}
impl Display for Interval {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
let mut s = String::new();
if self.months != 0 {
write!(s, "{} months ", self.months)?;
// Nanosecond convert to other time unit
pub const NS_PER_SEC: i64 = 1_000_000_000;
pub const NS_PER_MINUTE: i64 = 60 * NS_PER_SEC;
pub const NS_PER_HOUR: i64 = 60 * NS_PER_MINUTE;
pub const NS_PER_DAY: i64 = 24 * NS_PER_HOUR;
impl From<IntervalMonthDayNano> for IntervalFormat {
fn from(interval: IntervalMonthDayNano) -> Self {
IntervalFormat {
years: interval.months / 12,
months: interval.months % 12,
days: interval.days,
hours: interval.nanoseconds / NS_PER_HOUR,
minutes: (interval.nanoseconds % NS_PER_HOUR) / NS_PER_MINUTE,
seconds: (interval.nanoseconds % NS_PER_MINUTE) / NS_PER_SEC,
microseconds: (interval.nanoseconds % NS_PER_SEC) / 1_000,
}
if self.days != 0 {
write!(s, "{} days ", self.days)?;
}
if self.nsecs != 0 {
write!(s, "{} nsecs", self.nsecs)?;
}
write!(f, "{}", s.trim())
}
}
pub fn interval_year_month_to_month_day_nano(interval: IntervalYearMonth) -> IntervalMonthDayNano {
IntervalMonthDayNano {
months: interval.months,
days: 0,
nanoseconds: 0,
}
}
pub fn interval_day_time_to_month_day_nano(interval: IntervalDayTime) -> IntervalMonthDayNano {
IntervalMonthDayNano {
months: 0,
days: interval.days,
nanoseconds: interval.milliseconds as i64 * NANOS_PER_MILLI,
}
}
@@ -339,31 +332,6 @@ pub struct IntervalFormat {
pub microseconds: i64,
}
impl From<Interval> for IntervalFormat {
fn from(val: Interval) -> IntervalFormat {
let months = val.months;
let days = val.days;
let microseconds = val.nsecs / NANOS_PER_MICRO;
let years = (months - (months % 12)) / 12;
let months = months - years * 12;
let hours = (microseconds - (microseconds % 3_600_000_000)) / 3_600_000_000;
let microseconds = microseconds - hours * 3_600_000_000;
let minutes = (microseconds - (microseconds % 60_000_000)) / 60_000_000;
let microseconds = microseconds - minutes * 60_000_000;
let seconds = (microseconds - (microseconds % 1_000_000)) / 1_000_000;
let microseconds = microseconds - seconds * 1_000_000;
IntervalFormat {
years,
months,
days,
hours,
minutes,
seconds,
microseconds,
}
}
}
impl IntervalFormat {
/// All the field in the interval is 0
pub fn is_zero(&self) -> bool {
@@ -540,117 +508,37 @@ fn get_time_part(
interval
}
/// IntervalCompare is used to compare two intervals
/// It makes interval into nanoseconds style.
#[derive(PartialEq, Eq, Hash, PartialOrd, Ord)]
struct IntervalCompare(i128);
impl From<Interval> for IntervalCompare {
fn from(interval: Interval) -> Self {
Self(interval.to_nanosecond())
}
}
impl Ord for Interval {
fn cmp(&self, other: &Self) -> Ordering {
IntervalCompare::from(*self).cmp(&IntervalCompare::from(*other))
}
}
impl PartialOrd for Interval {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Eq for Interval {}
impl PartialEq for Interval {
fn eq(&self, other: &Self) -> bool {
self.cmp(other).is_eq()
}
}
impl Hash for Interval {
fn hash<H: Hasher>(&self, state: &mut H) {
IntervalCompare::from(*self).hash(state)
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use super::*;
use crate::timestamp::TimeUnit;
#[test]
fn test_from_year_month() {
let interval = Interval::from_year_month(1);
let interval = IntervalYearMonth::new(1);
assert_eq!(interval.months, 1);
}
#[test]
fn test_from_date_time() {
let interval = Interval::from_day_time(1, 2);
let interval = IntervalDayTime::new(1, 2);
assert_eq!(interval.days, 1);
assert_eq!(interval.nsecs, 2_000_000);
assert_eq!(interval.milliseconds, 2);
}
#[test]
fn test_to_duration() {
let interval = Interval::from_day_time(1, 2);
let duration = interval.to_duration().unwrap();
assert_eq!(86400002000000, duration.value());
assert_eq!(TimeUnit::Nanosecond, duration.unit());
let interval = Interval::from_year_month(12);
let duration = interval.to_duration().unwrap();
assert_eq!(31104000000000000, duration.value());
assert_eq!(TimeUnit::Nanosecond, duration.unit());
}
#[test]
fn test_interval_is_positive() {
let interval = Interval::from_year_month(1);
assert!(interval.is_positive());
let interval = Interval::from_year_month(-1);
assert!(!interval.is_positive());
let interval = Interval::from_day_time(1, i32::MIN);
assert!(!interval.is_positive());
}
#[test]
fn test_to_nanosecond() {
let interval = Interval::from_year_month(1);
assert_eq!(interval.to_nanosecond(), 2592000000000000);
let interval = Interval::from_day_time(1, 2);
assert_eq!(interval.to_nanosecond(), 86400002000000);
let max_interval = Interval::from_month_day_nano(i32::MAX, i32::MAX, i64::MAX);
assert_eq!(max_interval.to_nanosecond(), 5751829423496836854775807);
let min_interval = Interval::from_month_day_nano(i32::MIN, i32::MIN, i64::MIN);
assert_eq!(min_interval.to_nanosecond(), -5751829426175236854775808);
}
#[test]
fn test_interval_is_zero() {
let interval = Interval::from_month_day_nano(1, 1, 1);
assert!(!interval.is_zero());
let interval = Interval::from_month_day_nano(0, 0, 0);
assert!(interval.is_zero());
fn test_from_month_day_nano() {
let interval = IntervalMonthDayNano::new(1, 2, 3);
assert_eq!(interval.months, 1);
assert_eq!(interval.days, 2);
assert_eq!(interval.nanoseconds, 3);
}
#[test]
fn test_interval_i128_convert() {
let test_interval_eq = |month, day, nano| {
let interval = Interval::from_month_day_nano(month, day, nano);
let interval = IntervalMonthDayNano::new(month, day, nano);
let interval_i128 = interval.to_i128();
let interval2 = Interval::from_i128(interval_i128);
let interval2 = IntervalMonthDayNano::from_i128(interval_i128);
assert_eq!(interval, interval2);
};
@@ -666,11 +554,26 @@ mod tests {
test_interval_eq(i32::MAX, i32::MIN, i64::MIN);
test_interval_eq(i32::MIN, i32::MAX, i64::MIN);
test_interval_eq(i32::MIN, i32::MIN, i64::MIN);
let interval = IntervalMonthDayNano::from_i128(1);
assert_eq!(interval, IntervalMonthDayNano::new(0, 0, 1));
assert_eq!(1, IntervalMonthDayNano::new(0, 0, 1).to_i128());
}
#[test]
fn test_interval_i64_convert() {
let interval = IntervalDayTime::from_i64(1);
assert_eq!(interval, IntervalDayTime::new(0, 1));
assert_eq!(1, IntervalDayTime::new(0, 1).to_i64());
}
#[test]
fn test_convert_interval_format() {
let interval = Interval::from_month_day_nano(14, 160, 1000000);
let interval = IntervalMonthDayNano {
months: 14,
days: 160,
nanoseconds: 1000000,
};
let interval_format = IntervalFormat::from(interval);
assert_eq!(interval_format.years, 1);
assert_eq!(interval_format.months, 2);
@@ -681,94 +584,34 @@ mod tests {
assert_eq!(interval_format.microseconds, 1000);
}
#[test]
fn test_interval_hash() {
let interval = Interval::from_month_day_nano(1, 31, 1);
let interval2 = Interval::from_month_day_nano(2, 1, 1);
let mut map = HashMap::new();
map.insert(interval, 1);
assert_eq!(map.get(&interval2), Some(&1));
}
#[test]
fn test_interval_mul_int() {
let interval = Interval::from_month_day_nano(1, 1, 1);
let interval2 = interval.checked_mul_int(2).unwrap();
assert_eq!(interval2.months, 2);
assert_eq!(interval2.days, 2);
assert_eq!(interval2.nsecs, 2);
// test justified interval
let interval = Interval::from_month_day_nano(1, 31, 1);
let interval2 = interval.checked_mul_int(2).unwrap();
assert_eq!(interval2.months, 4);
assert_eq!(interval2.days, 2);
assert_eq!(interval2.nsecs, 2);
// test overflow situation
let interval = Interval::from_month_day_nano(i32::MAX, 1, 1);
let interval2 = interval.checked_mul_int(2);
assert!(interval2.is_none());
}
#[test]
fn test_display() {
let interval = Interval::from_month_day_nano(1, 1, 1);
assert_eq!(interval.to_string(), "1 months 1 days 1 nsecs");
let interval = Interval::from_month_day_nano(14, 31, 10000000000);
assert_eq!(interval.to_string(), "14 months 31 days 10000000000 nsecs");
}
#[test]
fn test_interval_justified() {
let interval = Interval::from_month_day_nano(1, 131, 1).justified_interval();
let interval2 = Interval::from_month_day_nano(5, 11, 1);
assert_eq!(interval, interval2);
let interval = Interval::from_month_day_nano(1, 1, NANOS_PER_MONTH + 2 * NANOS_PER_DAY)
.justified_interval();
let interval2 = Interval::from_month_day_nano(2, 3, 0);
assert_eq!(interval, interval2);
}
#[test]
fn test_serde_json() {
let interval = Interval::from_month_day_nano(1, 1, 1);
let json = serde_json::to_string(&interval).unwrap();
assert_eq!(
json,
"{\"months\":1,\"days\":1,\"nsecs\":1,\"unit\":\"MonthDayNano\"}"
);
let interval2: Interval = serde_json::from_str(&json).unwrap();
assert_eq!(interval, interval2);
}
#[test]
fn test_to_iso8601_string() {
// Test interval zero
let interval = Interval::from_month_day_nano(0, 0, 0);
let interval = IntervalMonthDayNano::new(0, 0, 0);
assert_eq!(interval.to_iso8601_string(), "PT0S");
let interval = Interval::from_month_day_nano(1, 1, 1);
let interval = IntervalMonthDayNano::new(1, 1, 1);
assert_eq!(interval.to_iso8601_string(), "P0Y1M1DT0H0M0S");
let interval = Interval::from_month_day_nano(14, 31, 10000000000);
let interval = IntervalMonthDayNano::new(14, 31, 10000000000);
assert_eq!(interval.to_iso8601_string(), "P1Y2M31DT0H0M10S");
let interval = Interval::from_month_day_nano(14, 31, 23210200000000);
let interval = IntervalMonthDayNano::new(14, 31, 23210200000000);
assert_eq!(interval.to_iso8601_string(), "P1Y2M31DT6H26M50.2S");
}
#[test]
fn test_to_postgres_string() {
// Test interval zero
let interval = Interval::from_month_day_nano(0, 0, 0);
assert_eq!(interval.to_postgres_string(), "00:00:00");
let interval = Interval::from_month_day_nano(23, 100, 23210200000000);
let interval = IntervalMonthDayNano::new(0, 0, 0);
assert_eq!(
interval.to_postgres_string(),
IntervalFormat::from(interval).to_postgres_string(),
"00:00:00"
);
let interval = IntervalMonthDayNano::new(23, 100, 23210200000000);
assert_eq!(
IntervalFormat::from(interval).to_postgres_string(),
"1 year 11 mons 100 days 06:26:50.200000"
);
}
@@ -776,18 +619,21 @@ mod tests {
#[test]
fn test_to_sql_standard_string() {
// Test zero interval
let interval = Interval::from_month_day_nano(0, 0, 0);
assert_eq!(interval.to_sql_standard_string(), "0");
let interval = IntervalMonthDayNano::new(0, 0, 0);
assert_eq!(IntervalFormat::from(interval).to_sql_standard_string(), "0");
let interval = Interval::from_month_day_nano(23, 100, 23210200000000);
let interval = IntervalMonthDayNano::new(23, 100, 23210200000000);
assert_eq!(
interval.to_sql_standard_string(),
IntervalFormat::from(interval).to_sql_standard_string(),
"+1-11 +100 +6:26:50.200000"
);
// Test interval without year, month, day
let interval = Interval::from_month_day_nano(0, 0, 23210200000000);
assert_eq!(interval.to_sql_standard_string(), "6:26:50.200000");
let interval = IntervalMonthDayNano::new(0, 0, 23210200000000);
assert_eq!(
IntervalFormat::from(interval).to_sql_standard_string(),
"6:26:50.200000"
);
}
#[test]

View File

@@ -27,7 +27,7 @@ pub mod util;
pub use date::Date;
pub use datetime::DateTime;
pub use duration::Duration;
pub use interval::Interval;
pub use interval::{IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
pub use range::RangeMillis;
pub use timestamp::Timestamp;
pub use timestamp_millis::TimestampMillis;

View File

@@ -20,16 +20,17 @@ use std::time::Duration;
use arrow::datatypes::TimeUnit as ArrowTimeUnit;
use chrono::{
DateTime, Days, LocalResult, Months, NaiveDate, NaiveDateTime, NaiveTime,
DateTime, Days, LocalResult, Months, NaiveDate, NaiveDateTime, NaiveTime, TimeDelta,
TimeZone as ChronoTimeZone, Utc,
};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use crate::error;
use crate::error::{ArithmeticOverflowSnafu, ParseTimestampSnafu, Result, TimestampOverflowSnafu};
use crate::interval::{IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
use crate::timezone::{get_timezone, Timezone};
use crate::util::{datetime_to_utc, div_ceil};
use crate::{error, Interval};
/// Timestamp represents the value of units(seconds/milliseconds/microseconds/nanoseconds) elapsed
/// since UNIX epoch. The valid value range of [Timestamp] depends on it's unit (all in UTC timezone):
@@ -140,40 +141,77 @@ impl Timestamp {
})
}
/// Adds given Interval to the current timestamp.
/// Returns None if the resulting timestamp would be out of range.
pub fn add_interval(&self, interval: Interval) -> Option<Timestamp> {
// FIXME(yingwen): remove add/sub intervals later
/// Adds given [IntervalYearMonth] to the current timestamp.
pub fn add_year_month(&self, interval: IntervalYearMonth) -> Option<Timestamp> {
let naive_datetime = self.to_chrono_datetime()?;
let (months, days, nsecs) = interval.to_month_day_nano();
let naive_datetime = naive_datetime
.checked_add_months(Months::new(months as u32))?
.checked_add_days(Days::new(days as u64))?
+ Duration::from_nanos(nsecs as u64);
let naive_datetime =
naive_datetime.checked_add_months(Months::new(interval.months as u32))?;
match Timestamp::from_chrono_datetime(naive_datetime) {
// Have to convert the new timestamp by the current unit.
Some(ts) => ts.convert_to(self.unit),
None => None,
}
// Have to convert the new timestamp by the current unit.
Timestamp::from_chrono_datetime(naive_datetime).and_then(|ts| ts.convert_to(self.unit))
}
/// Subtracts given Interval to the current timestamp.
/// Returns None if the resulting timestamp would be out of range.
pub fn sub_interval(&self, interval: Interval) -> Option<Timestamp> {
/// Adds given [IntervalDayTime] to the current timestamp.
pub fn add_day_time(&self, interval: IntervalDayTime) -> Option<Timestamp> {
let naive_datetime = self.to_chrono_datetime()?;
let (months, days, nsecs) = interval.to_month_day_nano();
let naive_datetime = naive_datetime
.checked_sub_months(Months::new(months as u32))?
.checked_sub_days(Days::new(days as u64))?
- Duration::from_nanos(nsecs as u64);
.checked_add_days(Days::new(interval.days as u64))?
.checked_add_signed(TimeDelta::milliseconds(interval.milliseconds as i64))?;
match Timestamp::from_chrono_datetime(naive_datetime) {
// Have to convert the new timestamp by the current unit.
Some(ts) => ts.convert_to(self.unit),
None => None,
}
// Have to convert the new timestamp by the current unit.
Timestamp::from_chrono_datetime(naive_datetime).and_then(|ts| ts.convert_to(self.unit))
}
/// Adds given [IntervalMonthDayNano] to the current timestamp.
pub fn add_month_day_nano(&self, interval: IntervalMonthDayNano) -> Option<Timestamp> {
let naive_datetime = self.to_chrono_datetime()?;
let naive_datetime = naive_datetime
.checked_add_months(Months::new(interval.months as u32))?
.checked_add_days(Days::new(interval.days as u64))?
.checked_add_signed(TimeDelta::nanoseconds(interval.nanoseconds))?;
// Have to convert the new timestamp by the current unit.
Timestamp::from_chrono_datetime(naive_datetime).and_then(|ts| ts.convert_to(self.unit))
}
/// Subtracts given [IntervalYearMonth] to the current timestamp.
pub fn sub_year_month(&self, interval: IntervalYearMonth) -> Option<Timestamp> {
let naive_datetime = self.to_chrono_datetime()?;
let naive_datetime =
naive_datetime.checked_sub_months(Months::new(interval.months as u32))?;
// Have to convert the new timestamp by the current unit.
Timestamp::from_chrono_datetime(naive_datetime).and_then(|ts| ts.convert_to(self.unit))
}
/// Subtracts given [IntervalDayTime] to the current timestamp.
pub fn sub_day_time(&self, interval: IntervalDayTime) -> Option<Timestamp> {
let naive_datetime = self.to_chrono_datetime()?;
let naive_datetime = naive_datetime
.checked_sub_days(Days::new(interval.days as u64))?
.checked_sub_signed(TimeDelta::milliseconds(interval.milliseconds as i64))?;
// Have to convert the new timestamp by the current unit.
Timestamp::from_chrono_datetime(naive_datetime).and_then(|ts| ts.convert_to(self.unit))
}
/// Subtracts given [IntervalMonthDayNano] to the current timestamp.
pub fn sub_month_day_nano(&self, interval: IntervalMonthDayNano) -> Option<Timestamp> {
let naive_datetime = self.to_chrono_datetime()?;
let naive_datetime = naive_datetime
.checked_sub_months(Months::new(interval.months as u32))?
.checked_sub_days(Days::new(interval.days as u64))?
.checked_sub_signed(TimeDelta::nanoseconds(interval.nanoseconds))?;
// Have to convert the new timestamp by the current unit.
Timestamp::from_chrono_datetime(naive_datetime).and_then(|ts| ts.convert_to(self.unit))
}
/// Subtracts current timestamp with another timestamp, yielding a duration.
@@ -688,13 +726,13 @@ mod tests {
fn test_add_sub_interval() {
let ts = Timestamp::new(1000, TimeUnit::Millisecond);
let interval = Interval::from_day_time(1, 200);
let interval = IntervalDayTime::new(1, 200);
let new_ts = ts.add_interval(interval).unwrap();
let new_ts = ts.add_day_time(interval).unwrap();
assert_eq!(new_ts.unit(), TimeUnit::Millisecond);
assert_eq!(new_ts.value(), 1000 + 3600 * 24 * 1000 + 200);
assert_eq!(ts, new_ts.sub_interval(interval).unwrap());
assert_eq!(ts, new_ts.sub_day_time(interval).unwrap());
}
#[test]

View File

@@ -12,11 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_time::interval::Interval;
use common_time::{IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
use paste::paste;
use serde::{Deserialize, Serialize};
use crate::prelude::{Scalar, Value, ValueRef};
use crate::prelude::Scalar;
use crate::scalars::ScalarRef;
use crate::types::{
IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType, WrapperType,
@@ -26,39 +25,6 @@ use crate::vectors::{IntervalDayTimeVector, IntervalMonthDayNanoVector, Interval
macro_rules! define_interval_with_unit {
($unit: ident, $native_ty: ty) => {
paste! {
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct [<Interval $unit>](pub Interval);
impl [<Interval $unit>] {
pub fn new(val: $native_ty) -> Self {
Self(Interval:: [<from_ $native_ty>](val))
}
}
impl Default for [<Interval $unit>] {
fn default() -> Self {
Self::new(0)
}
}
impl From<[<Interval $unit>]> for Value {
fn from(t: [<Interval $unit>]) -> Value {
Value::Interval(t.0)
}
}
impl From<[<Interval $unit>]> for serde_json::Value {
fn from(t: [<Interval $unit>]) -> Self {
t.0.into()
}
}
impl From<[<Interval $unit>]> for ValueRef<'static> {
fn from(t: [<Interval $unit>]) -> Self {
ValueRef::Interval(t.0)
}
}
impl Scalar for [<Interval $unit>] {
type VectorType = [<Interval $unit Vector>];
type RefType<'a> = [<Interval $unit>];
@@ -87,41 +53,11 @@ macro_rules! define_interval_with_unit {
type Native = $native_ty;
fn from_native(value: Self::Native) -> Self {
Self::new(value)
Self::[<from_ $native_ty>](value)
}
fn into_native(self) -> Self::Native {
self.0.[<to_ $native_ty>]()
}
}
impl From<$native_ty> for [<Interval $unit>] {
fn from(val: $native_ty) -> Self {
[<Interval $unit>]::from_native(val as $native_ty)
}
}
impl From<[<Interval $unit>]> for $native_ty {
fn from(val: [<Interval $unit>]) -> Self {
val.0.[<to_ $native_ty>]()
}
}
impl TryFrom<Value> for Option<[<Interval $unit>]> {
type Error = $crate::error::Error;
#[inline]
fn try_from(from: Value) -> std::result::Result<Self, Self::Error> {
match from {
Value::Interval(v) if v.unit() == common_time::interval::IntervalUnit::$unit => {
Ok(Some([<Interval $unit>](v)))
},
Value::Null => Ok(None),
_ => $crate::error::TryFromValueSnafu {
reason: format!("{:?} is not a {}", from, stringify!([<Interval $unit>])),
}
.fail(),
}
self.[<to_ $native_ty>]()
}
}
}
@@ -138,17 +74,17 @@ mod tests {
#[test]
fn test_interval_scalar() {
let interval = IntervalYearMonth::new(1000);
let interval = IntervalYearMonth::from(1000);
assert_eq!(interval, interval.as_scalar_ref());
assert_eq!(interval, interval.to_owned_scalar());
assert_eq!(1000, interval.into_native());
let interval = IntervalDayTime::new(1000);
let interval = IntervalDayTime::from(1000);
assert_eq!(interval, interval.as_scalar_ref());
assert_eq!(interval, interval.to_owned_scalar());
assert_eq!(1000, interval.into_native());
let interval = IntervalMonthDayNano::new(1000);
let interval = IntervalMonthDayNano::from(1000);
assert_eq!(interval, interval.as_scalar_ref());
assert_eq!(interval, interval.to_owned_scalar());
assert_eq!(1000, interval.into_native());
@@ -156,15 +92,15 @@ mod tests {
#[test]
fn test_interval_convert_to_native_type() {
let interval = IntervalMonthDayNano::new(1000);
let interval = IntervalMonthDayNano::from(1000);
let native_value: i128 = interval.into();
assert_eq!(native_value, 1000);
let interval = IntervalDayTime::new(1000);
let interval = IntervalDayTime::from(1000);
let native_interval: i64 = interval.into();
assert_eq!(native_interval, 1000);
let interval = IntervalYearMonth::new(1000);
let interval = IntervalYearMonth::from(1000);
let native_interval: i32 = interval.into();
assert_eq!(native_interval, 1000);
}

View File

@@ -17,8 +17,9 @@ use arrow::datatypes::{
IntervalMonthDayNanoType as ArrowIntervalMonthDayNanoType, IntervalUnit as ArrowIntervalUnit,
IntervalYearMonthType as ArrowIntervalYearMonthType,
};
use common_time::interval::IntervalUnit;
use common_time::Interval;
use common_time::interval::{
IntervalDayTime, IntervalMonthDayNano, IntervalUnit, IntervalYearMonth,
};
use enum_dispatch::enum_dispatch;
use paste::paste;
use serde::{Deserialize, Serialize};
@@ -26,7 +27,6 @@ use snafu::OptionExt;
use crate::data_type::ConcreteDataType;
use crate::error;
use crate::interval::{IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
use crate::prelude::{
DataType, LogicalTypeId, MutableVector, ScalarVectorBuilder, Value, ValueRef, Vector,
};
@@ -75,7 +75,7 @@ macro_rules! impl_data_type_for_interval {
}
fn default_value(&self) -> Value {
Value::Interval(Interval::from_i128(0))
Value::[<Interval $unit>]([<Interval $unit>]::default())
}
fn as_arrow_type(&self) -> ArrowDataType {
@@ -124,7 +124,7 @@ macro_rules! impl_data_type_for_interval {
fn cast_value_ref(value: ValueRef) -> crate::Result<Option<Self::Wrapper>> {
match value {
ValueRef::Null => Ok(None),
ValueRef::Interval(t) => Ok(Some([<Interval $unit>](t))),
ValueRef::[<Interval $unit>](t) => Ok(Some(t)),
other => error::CastTypeSnafu {
msg: format!("Failed to cast value {:?} to {}", other, stringify!([<Interval $unit>])),
}

View File

@@ -16,7 +16,6 @@ use std::cmp::Ordering;
use std::fmt;
use arrow::datatypes::{ArrowNativeType, ArrowPrimitiveType, DataType as ArrowDataType};
use common_time::interval::IntervalUnit;
use common_time::{Date, DateTime};
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
@@ -30,6 +29,7 @@ use crate::types::{DateTimeType, DateType};
use crate::value::{Value, ValueRef};
use crate::vectors::{MutableVector, PrimitiveVector, PrimitiveVectorBuilder, Vector};
// TODO(yingwen): Can we remove `Into<serde_json::Value>`?
/// Represents the wrapper type that wraps a native type using the `newtype pattern`,
/// such as [Date](`common_time::Date`) is a wrapper type for the underlying native
/// type `i32`.
@@ -364,11 +364,7 @@ impl DataType for Int64Type {
Value::DateTime(v) => Some(Value::Int64(v.val())),
Value::Timestamp(v) => Some(Value::Int64(v.value())),
Value::Time(v) => Some(Value::Int64(v.value())),
Value::Interval(v) => match v.unit() {
IntervalUnit::DayTime => Some(Value::Int64(v.to_i64())),
IntervalUnit::YearMonth => None,
IntervalUnit::MonthDayNano => None,
},
// We don't allow casting interval type to int.
_ => None,
}
}
@@ -410,11 +406,7 @@ impl DataType for Int32Type {
Value::Float64(v) => num::cast::cast(v).map(Value::Int32),
Value::String(v) => v.as_utf8().parse::<i32>().map(Value::Int32).ok(),
Value::Date(v) => Some(Value::Int32(v.val())),
Value::Interval(v) => match v.unit() {
IntervalUnit::YearMonth => Some(Value::Int32(v.to_i32())),
IntervalUnit::DayTime => None,
IntervalUnit::MonthDayNano => None,
},
// We don't allow casting interval type to int.
_ => None,
}
}

View File

@@ -78,7 +78,15 @@ impl DataType for StringType {
Value::DateTime(v) => Some(Value::String(StringBytes::from(v.to_string()))),
Value::Timestamp(v) => Some(Value::String(StringBytes::from(v.to_iso8601_string()))),
Value::Time(v) => Some(Value::String(StringBytes::from(v.to_iso8601_string()))),
Value::Interval(v) => Some(Value::String(StringBytes::from(v.to_iso8601_string()))),
Value::IntervalYearMonth(v) => {
Some(Value::String(StringBytes::from(v.to_iso8601_string())))
}
Value::IntervalDayTime(v) => {
Some(Value::String(StringBytes::from(v.to_iso8601_string())))
}
Value::IntervalMonthDayNano(v) => {
Some(Value::String(StringBytes::from(v.to_iso8601_string())))
}
Value::Duration(v) => Some(Value::String(StringBytes::from(v.to_string()))),
Value::Decimal128(v) => Some(Value::String(StringBytes::from(v.to_string()))),

View File

@@ -28,7 +28,7 @@ use common_time::datetime::DateTime;
use common_time::interval::IntervalUnit;
use common_time::time::Time;
use common_time::timestamp::{TimeUnit, Timestamp};
use common_time::{Duration, Interval, Timezone};
use common_time::{Duration, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timezone};
use datafusion_common::ScalarValue;
use greptime_proto::v1::value::ValueData;
pub use ordered_float::OrderedFloat;
@@ -78,7 +78,10 @@ pub enum Value {
Timestamp(Timestamp),
Time(Time),
Duration(Duration),
Interval(Interval),
// Interval types:
IntervalYearMonth(IntervalYearMonth),
IntervalDayTime(IntervalDayTime),
IntervalMonthDayNano(IntervalMonthDayNano),
List(ListValue),
}
@@ -111,7 +114,15 @@ impl Display for Value {
Value::DateTime(v) => write!(f, "{v}"),
Value::Timestamp(v) => write!(f, "{}", v.to_iso8601_string()),
Value::Time(t) => write!(f, "{}", t.to_iso8601_string()),
Value::Interval(v) => write!(f, "{}", v.to_iso8601_string()),
Value::IntervalYearMonth(v) => {
write!(f, "{}", v.to_iso8601_string())
}
Value::IntervalDayTime(v) => {
write!(f, "{}", v.to_iso8601_string())
}
Value::IntervalMonthDayNano(v) => {
write!(f, "{}", v.to_iso8601_string())
}
Value::Duration(d) => write!(f, "{d}"),
Value::List(v) => {
let items = v
@@ -153,7 +164,15 @@ macro_rules! define_data_type_func {
$struct::DateTime(_) => ConcreteDataType::datetime_datatype(),
$struct::Time(t) => ConcreteDataType::time_datatype(*t.unit()),
$struct::Timestamp(v) => ConcreteDataType::timestamp_datatype(v.unit()),
$struct::Interval(v) => ConcreteDataType::interval_datatype(v.unit()),
$struct::IntervalYearMonth(_) => {
ConcreteDataType::interval_datatype(IntervalUnit::YearMonth)
}
$struct::IntervalDayTime(_) => {
ConcreteDataType::interval_datatype(IntervalUnit::DayTime)
}
$struct::IntervalMonthDayNano(_) => {
ConcreteDataType::interval_datatype(IntervalUnit::MonthDayNano)
}
$struct::List(list) => ConcreteDataType::list_datatype(list.datatype().clone()),
$struct::Duration(d) => ConcreteDataType::duration_datatype(d.unit()),
$struct::Decimal128(d) => {
@@ -206,7 +225,9 @@ impl Value {
Value::List(v) => ValueRef::List(ListValueRef::Ref { val: v }),
Value::Timestamp(v) => ValueRef::Timestamp(*v),
Value::Time(v) => ValueRef::Time(*v),
Value::Interval(v) => ValueRef::Interval(*v),
Value::IntervalYearMonth(v) => ValueRef::IntervalYearMonth(*v),
Value::IntervalDayTime(v) => ValueRef::IntervalDayTime(*v),
Value::IntervalMonthDayNano(v) => ValueRef::IntervalMonthDayNano(*v),
Value::Duration(v) => ValueRef::Duration(*v),
Value::Decimal128(v) => ValueRef::Decimal128(*v),
}
@@ -220,14 +241,6 @@ impl Value {
}
}
/// Cast Value to Interval. Return None if value is not a valid interval data type.
pub fn as_interval(&self) -> Option<Interval> {
match self {
Value::Interval(i) => Some(*i),
_ => None,
}
}
/// Cast Value to utf8 String. Return None if value is not a valid string data type.
pub fn as_string(&self) -> Option<String> {
match self {
@@ -255,12 +268,35 @@ impl Value {
/// Cast Value to [Time]. Return None if value is not a valid time data type.
pub fn as_time(&self) -> Option<Time> {
match self {
Value::Int64(v) => Some(Time::new_millisecond(*v)),
Value::Time(t) => Some(*t),
_ => None,
}
}
/// Cast Value to [IntervalYearMonth]. Return None if value is not a valid interval year month data type.
pub fn as_interval_year_month(&self) -> Option<IntervalYearMonth> {
match self {
Value::IntervalYearMonth(v) => Some(*v),
_ => None,
}
}
/// Cast Value to [IntervalDayTime]. Return None if value is not a valid interval day time data type.
pub fn as_interval_day_time(&self) -> Option<IntervalDayTime> {
match self {
Value::IntervalDayTime(v) => Some(*v),
_ => None,
}
}
/// Cast Value to [IntervalMonthDayNano]. Return None if value is not a valid interval month day nano data type.
pub fn as_interval_month_day_nano(&self) -> Option<IntervalMonthDayNano> {
match self {
Value::IntervalMonthDayNano(v) => Some(*v),
_ => None,
}
}
/// Cast Value to u64. Return None if value is not a valid uint64 data type.
pub fn as_u64(&self) -> Option<u64> {
match self {
@@ -321,11 +357,9 @@ impl Value {
TimeUnit::Microsecond => LogicalTypeId::TimeMicrosecond,
TimeUnit::Nanosecond => LogicalTypeId::TimeNanosecond,
},
Value::Interval(v) => match v.unit() {
IntervalUnit::YearMonth => LogicalTypeId::IntervalYearMonth,
IntervalUnit::DayTime => LogicalTypeId::IntervalDayTime,
IntervalUnit::MonthDayNano => LogicalTypeId::IntervalMonthDayNano,
},
Value::IntervalYearMonth(_) => LogicalTypeId::IntervalYearMonth,
Value::IntervalDayTime(_) => LogicalTypeId::IntervalDayTime,
Value::IntervalMonthDayNano(_) => LogicalTypeId::IntervalMonthDayNano,
Value::Duration(d) => match d.unit() {
TimeUnit::Second => LogicalTypeId::DurationSecond,
TimeUnit::Millisecond => LogicalTypeId::DurationMillisecond,
@@ -375,11 +409,9 @@ impl Value {
}
Value::Timestamp(t) => timestamp_to_scalar_value(t.unit(), Some(t.value())),
Value::Time(t) => time_to_scalar_value(*t.unit(), Some(t.value()))?,
Value::Interval(v) => match v.unit() {
IntervalUnit::YearMonth => ScalarValue::IntervalYearMonth(Some(v.to_i32())),
IntervalUnit::DayTime => ScalarValue::IntervalDayTime(Some(v.to_i64())),
IntervalUnit::MonthDayNano => ScalarValue::IntervalMonthDayNano(Some(v.to_i128())),
},
Value::IntervalYearMonth(v) => ScalarValue::IntervalYearMonth(Some(v.to_i32())),
Value::IntervalDayTime(v) => ScalarValue::IntervalDayTime(Some(v.to_i64())),
Value::IntervalMonthDayNano(v) => ScalarValue::IntervalMonthDayNano(Some(v.to_i128())),
Value::Duration(d) => duration_to_scalar_value(d.unit(), Some(d.value())),
Value::Decimal128(d) => {
let (v, p, s) = d.to_scalar_value();
@@ -434,7 +466,9 @@ impl Value {
Value::Timestamp(x) => Some(Value::Timestamp(x.negative())),
Value::Time(x) => Some(Value::Time(x.negative())),
Value::Duration(x) => Some(Value::Duration(x.negative())),
Value::Interval(x) => Some(Value::Interval(x.negative())),
Value::IntervalYearMonth(x) => Some(Value::IntervalYearMonth(x.negative())),
Value::IntervalDayTime(x) => Some(Value::IntervalDayTime(x.negative())),
Value::IntervalMonthDayNano(x) => Some(Value::IntervalMonthDayNano(x.negative())),
Value::Binary(_) | Value::String(_) | Value::Boolean(_) | Value::List(_) => None,
}
@@ -571,16 +605,6 @@ pub fn scalar_value_to_timestamp(
}
}
/// Convert [ScalarValue] to [Interval].
pub fn scalar_value_to_interval(scalar: &ScalarValue) -> Option<Interval> {
match scalar {
ScalarValue::IntervalYearMonth(v) => v.map(Interval::from_i32),
ScalarValue::IntervalDayTime(v) => v.map(Interval::from_i64),
ScalarValue::IntervalMonthDayNano(v) => v.map(Interval::from_i128),
_ => None,
}
}
macro_rules! impl_ord_for_value_like {
($Type: ident, $left: ident, $right: ident) => {
if $left.is_null() && !$right.is_null() {
@@ -607,7 +631,9 @@ macro_rules! impl_ord_for_value_like {
($Type::DateTime(v1), $Type::DateTime(v2)) => v1.cmp(v2),
($Type::Timestamp(v1), $Type::Timestamp(v2)) => v1.cmp(v2),
($Type::Time(v1), $Type::Time(v2)) => v1.cmp(v2),
($Type::Interval(v1), $Type::Interval(v2)) => v1.cmp(v2),
($Type::IntervalYearMonth(v1), $Type::IntervalYearMonth(v2)) => v1.cmp(v2),
($Type::IntervalDayTime(v1), $Type::IntervalDayTime(v2)) => v1.cmp(v2),
($Type::IntervalMonthDayNano(v1), $Type::IntervalMonthDayNano(v2)) => v1.cmp(v2),
($Type::Duration(v1), $Type::Duration(v2)) => v1.cmp(v2),
($Type::List(v1), $Type::List(v2)) => v1.cmp(v2),
_ => panic!(
@@ -685,7 +711,9 @@ impl_try_from_value!(Date, Date);
impl_try_from_value!(Time, Time);
impl_try_from_value!(DateTime, DateTime);
impl_try_from_value!(Timestamp, Timestamp);
impl_try_from_value!(Interval, Interval);
impl_try_from_value!(IntervalYearMonth, IntervalYearMonth);
impl_try_from_value!(IntervalDayTime, IntervalDayTime);
impl_try_from_value!(IntervalMonthDayNano, IntervalMonthDayNano);
impl_try_from_value!(Duration, Duration);
impl_try_from_value!(Decimal128, Decimal128);
@@ -727,7 +755,9 @@ impl_value_from!(Date, Date);
impl_value_from!(Time, Time);
impl_value_from!(DateTime, DateTime);
impl_value_from!(Timestamp, Timestamp);
impl_value_from!(Interval, Interval);
impl_value_from!(IntervalYearMonth, IntervalYearMonth);
impl_value_from!(IntervalDayTime, IntervalDayTime);
impl_value_from!(IntervalMonthDayNano, IntervalMonthDayNano);
impl_value_from!(Duration, Duration);
impl_value_from!(String, String);
impl_value_from!(Decimal128, Decimal128);
@@ -774,7 +804,9 @@ impl TryFrom<Value> for serde_json::Value {
Value::List(v) => serde_json::to_value(v)?,
Value::Timestamp(v) => serde_json::to_value(v.value())?,
Value::Time(v) => serde_json::to_value(v.value())?,
Value::Interval(v) => serde_json::to_value(v.to_i128())?,
Value::IntervalYearMonth(v) => serde_json::to_value(v.to_i32())?,
Value::IntervalDayTime(v) => serde_json::to_value(v.to_i64())?,
Value::IntervalMonthDayNano(v) => serde_json::to_value(v.to_i128())?,
Value::Duration(v) => serde_json::to_value(v.value())?,
Value::Decimal128(v) => serde_json::to_value(v.to_string())?,
};
@@ -926,13 +958,13 @@ impl TryFrom<ScalarValue> for Value {
.unwrap_or(Value::Null),
ScalarValue::IntervalYearMonth(t) => t
.map(|x| Value::Interval(Interval::from_i32(x)))
.map(|x| Value::IntervalYearMonth(IntervalYearMonth::from_i32(x)))
.unwrap_or(Value::Null),
ScalarValue::IntervalDayTime(t) => t
.map(|x| Value::Interval(Interval::from_i64(x)))
.map(|x| Value::IntervalDayTime(IntervalDayTime::from_i64(x)))
.unwrap_or(Value::Null),
ScalarValue::IntervalMonthDayNano(t) => t
.map(|x| Value::Interval(Interval::from_i128(x)))
.map(|x| Value::IntervalMonthDayNano(IntervalMonthDayNano::from_i128(x)))
.unwrap_or(Value::Null),
ScalarValue::DurationSecond(d) => d
.map(|x| Value::Duration(Duration::new(x, TimeUnit::Second)))
@@ -987,7 +1019,9 @@ impl From<ValueRef<'_>> for Value {
ValueRef::DateTime(v) => Value::DateTime(v),
ValueRef::Timestamp(v) => Value::Timestamp(v),
ValueRef::Time(v) => Value::Time(v),
ValueRef::Interval(v) => Value::Interval(v),
ValueRef::IntervalYearMonth(v) => Value::IntervalYearMonth(v),
ValueRef::IntervalDayTime(v) => Value::IntervalDayTime(v),
ValueRef::IntervalMonthDayNano(v) => Value::IntervalMonthDayNano(v),
ValueRef::Duration(v) => Value::Duration(v),
ValueRef::List(v) => v.to_value(),
ValueRef::Decimal128(v) => Value::Decimal128(v),
@@ -1026,7 +1060,10 @@ pub enum ValueRef<'a> {
Timestamp(Timestamp),
Time(Time),
Duration(Duration),
Interval(Interval),
// Interval types:
IntervalYearMonth(IntervalYearMonth),
IntervalDayTime(IntervalDayTime),
IntervalMonthDayNano(IntervalMonthDayNano),
// Compound types:
List(ListValueRef<'a>),
@@ -1150,9 +1187,19 @@ impl<'a> ValueRef<'a> {
impl_as_for_value_ref!(self, Duration)
}
/// Cast itself to [Interval].
pub fn as_interval(&self) -> Result<Option<Interval>> {
impl_as_for_value_ref!(self, Interval)
/// Cast itself to [IntervalYearMonth].
pub fn as_interval_year_month(&self) -> Result<Option<IntervalYearMonth>> {
impl_as_for_value_ref!(self, IntervalYearMonth)
}
/// Cast itself to [IntervalDayTime].
pub fn as_interval_day_time(&self) -> Result<Option<IntervalDayTime>> {
impl_as_for_value_ref!(self, IntervalDayTime)
}
/// Cast itself to [IntervalMonthDayNano].
pub fn as_interval_month_day_nano(&self) -> Result<Option<IntervalMonthDayNano>> {
impl_as_for_value_ref!(self, IntervalMonthDayNano)
}
/// Cast itself to [ListValueRef].
@@ -1212,7 +1259,9 @@ impl_value_ref_from!(Date, Date);
impl_value_ref_from!(DateTime, DateTime);
impl_value_ref_from!(Timestamp, Timestamp);
impl_value_ref_from!(Time, Time);
impl_value_ref_from!(Interval, Interval);
impl_value_ref_from!(IntervalYearMonth, IntervalYearMonth);
impl_value_ref_from!(IntervalDayTime, IntervalDayTime);
impl_value_ref_from!(IntervalMonthDayNano, IntervalMonthDayNano);
impl_value_ref_from!(Duration, Duration);
impl_value_ref_from!(Decimal128, Decimal128);
@@ -1261,7 +1310,9 @@ impl<'a> TryFrom<ValueRef<'a>> for serde_json::Value {
ValueRef::List(v) => serde_json::to_value(v)?,
ValueRef::Timestamp(v) => serde_json::to_value(v.value())?,
ValueRef::Time(v) => serde_json::to_value(v.value())?,
ValueRef::Interval(v) => serde_json::to_value(v.to_i128())?,
ValueRef::IntervalYearMonth(v) => serde_json::Value::from(v),
ValueRef::IntervalDayTime(v) => serde_json::Value::from(v),
ValueRef::IntervalMonthDayNano(v) => serde_json::Value::from(v),
ValueRef::Duration(v) => serde_json::to_value(v.value())?,
ValueRef::Decimal128(v) => serde_json::to_value(v.to_string())?,
};
@@ -1359,7 +1410,9 @@ impl<'a> ValueRef<'a> {
ValueRef::Timestamp(_) => 16,
ValueRef::Time(_) => 16,
ValueRef::Duration(_) => 16,
ValueRef::Interval(_) => 24,
ValueRef::IntervalYearMonth(_) => 4,
ValueRef::IntervalDayTime(_) => 8,
ValueRef::IntervalMonthDayNano(_) => 16,
ValueRef::Decimal128(_) => 32,
ValueRef::List(v) => match v {
ListValueRef::Indexed { vector, .. } => vector.memory_size() / vector.len(),
@@ -1428,7 +1481,9 @@ pub fn column_data_to_json(data: ValueData) -> JsonValue {
mod tests {
use arrow::datatypes::DataType as ArrowDataType;
use common_time::timezone::set_default_timezone;
use greptime_proto::v1::{Decimal128 as ProtoDecimal128, IntervalMonthDayNano};
use greptime_proto::v1::{
Decimal128 as ProtoDecimal128, IntervalMonthDayNano as ProtoIntervalMonthDayNano,
};
use num_traits::Float;
use super::*;
@@ -1525,11 +1580,13 @@ mod tests {
JsonValue::String("interval year [12]".to_string())
);
assert_eq!(
column_data_to_json(ValueData::IntervalMonthDayNanoValue(IntervalMonthDayNano {
months: 1,
days: 2,
nanoseconds: 3,
})),
column_data_to_json(ValueData::IntervalMonthDayNanoValue(
ProtoIntervalMonthDayNano {
months: 1,
days: 2,
nanoseconds: 3,
}
)),
JsonValue::String("interval month [1][2][3]".to_string())
);
assert_eq!(
@@ -1740,12 +1797,10 @@ mod tests {
ScalarValue::IntervalMonthDayNano(None).try_into().unwrap()
);
assert_eq!(
Value::Interval(Interval::from_month_day_nano(1, 1, 1)),
ScalarValue::IntervalMonthDayNano(Some(
Interval::from_month_day_nano(1, 1, 1).to_i128()
))
.try_into()
.unwrap()
Value::IntervalMonthDayNano(IntervalMonthDayNano::new(1, 1, 1)),
ScalarValue::IntervalMonthDayNano(Some(IntervalMonthDayNano::new(1, 1, 1).to_i128()))
.try_into()
.unwrap()
);
assert_eq!(
@@ -1975,9 +2030,17 @@ mod tests {
&ConcreteDataType::time_nanosecond_datatype(),
&Value::Time(Time::new_nanosecond(1)),
);
check_type_and_value(
&ConcreteDataType::interval_year_month_datatype(),
&Value::IntervalYearMonth(IntervalYearMonth::new(1)),
);
check_type_and_value(
&ConcreteDataType::interval_day_time_datatype(),
&Value::IntervalDayTime(IntervalDayTime::new(1, 2)),
);
check_type_and_value(
&ConcreteDataType::interval_month_day_nano_datatype(),
&Value::Interval(Interval::from_month_day_nano(1, 2, 3)),
&Value::IntervalMonthDayNano(IntervalMonthDayNano::new(1, 2, 3)),
);
check_type_and_value(
&ConcreteDataType::duration_second_datatype(),
@@ -2160,7 +2223,9 @@ mod tests {
check_as_value_ref!(Float64, OrderedF64::from(16.0));
check_as_value_ref!(Timestamp, Timestamp::new_millisecond(1));
check_as_value_ref!(Time, Time::new_millisecond(1));
check_as_value_ref!(Interval, Interval::from_month_day_nano(1, 2, 3));
check_as_value_ref!(IntervalYearMonth, IntervalYearMonth::new(1));
check_as_value_ref!(IntervalDayTime, IntervalDayTime::new(1, 2));
check_as_value_ref!(IntervalMonthDayNano, IntervalMonthDayNano::new(1, 2, 3));
check_as_value_ref!(Duration, Duration::new_millisecond(1));
assert_eq!(
@@ -2672,9 +2737,11 @@ mod tests {
check_value_ref_size_eq(&ValueRef::DateTime(DateTime::new(1)), 8);
check_value_ref_size_eq(&ValueRef::Timestamp(Timestamp::new_millisecond(1)), 16);
check_value_ref_size_eq(&ValueRef::Time(Time::new_millisecond(1)), 16);
check_value_ref_size_eq(&ValueRef::IntervalYearMonth(IntervalYearMonth::new(1)), 4);
check_value_ref_size_eq(&ValueRef::IntervalDayTime(IntervalDayTime::new(1, 2)), 8);
check_value_ref_size_eq(
&ValueRef::Interval(Interval::from_month_day_nano(1, 2, 3)),
24,
&ValueRef::IntervalMonthDayNano(IntervalMonthDayNano::new(1, 2, 3)),
16,
);
check_value_ref_size_eq(&ValueRef::Duration(Duration::new_millisecond(1)), 16);
check_value_ref_size_eq(

View File

@@ -421,7 +421,7 @@ mod tests {
use common_decimal::Decimal128;
use common_time::time::Time;
use common_time::timestamp::TimeUnit;
use common_time::{Date, DateTime, Duration, Interval};
use common_time::{Date, DateTime, Duration, IntervalMonthDayNano};
use super::*;
use crate::value::Value;
@@ -689,7 +689,10 @@ mod tests {
);
assert_eq!(3, vector.len());
for i in 0..vector.len() {
assert_eq!(Value::Interval(Interval::from_i128(2000)), vector.get(i));
assert_eq!(
Value::IntervalMonthDayNano(IntervalMonthDayNano::from_i128(2000)),
vector.get(i)
);
}
}

View File

@@ -560,7 +560,7 @@ fn reduce_batch_subgraph(
.get_mut(i)
.context(InternalSnafu{
reason: format!(
"Output builder should have the same length as the row, expected at most {} but got {}",
"Output builder should have the same length as the row, expected at most {} but got {}",
column_cnt - 1,
i
)
@@ -1162,7 +1162,9 @@ fn from_val_to_slice_idx(
#[cfg(test)]
mod test {
use common_time::{DateTime, Interval, Timestamp};
use std::time::Duration;
use common_time::Timestamp;
use datatypes::data_type::{ConcreteDataType, ConcreteDataType as CDT};
use hydroflow::scheduled::graph::Hydroflow;
@@ -1214,8 +1216,8 @@ mod test {
let expected = TypedPlan {
schema: RelationType::new(vec![
ColumnType::new(CDT::uint64_datatype(), true), // sum(number)
ColumnType::new(CDT::datetime_datatype(), false), // window start
ColumnType::new(CDT::datetime_datatype(), false), // window end
ColumnType::new(CDT::timestamp_millisecond_datatype(), false), // window start
ColumnType::new(CDT::timestamp_millisecond_datatype(), false), // window end
])
.into_unnamed(),
// TODO(discord9): mfp indirectly ref to key columns
@@ -1232,7 +1234,10 @@ mod test {
.with_types(
RelationType::new(vec![
ColumnType::new(ConcreteDataType::uint32_datatype(), false),
ColumnType::new(ConcreteDataType::datetime_datatype(), false),
ColumnType::new(
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
])
.into_unnamed(),
),
@@ -1242,22 +1247,18 @@ mod test {
.map(vec![
ScalarExpr::Column(1).call_unary(
UnaryFunc::TumbleWindowFloor {
window_size: Interval::from_month_day_nano(
0,
0,
1_000_000_000,
),
start_time: Some(DateTime::new(1625097600000)),
window_size: Duration::from_nanos(1_000_000_000),
start_time: Some(Timestamp::new_millisecond(
1625097600000,
)),
},
),
ScalarExpr::Column(1).call_unary(
UnaryFunc::TumbleWindowCeiling {
window_size: Interval::from_month_day_nano(
0,
0,
1_000_000_000,
),
start_time: Some(DateTime::new(1625097600000)),
window_size: Duration::from_nanos(1_000_000_000),
start_time: Some(Timestamp::new_millisecond(
1625097600000,
)),
},
),
])
@@ -1278,9 +1279,9 @@ mod test {
}
.with_types(
RelationType::new(vec![
ColumnType::new(CDT::datetime_datatype(), false), // window start
ColumnType::new(CDT::datetime_datatype(), false), // window end
ColumnType::new(CDT::uint64_datatype(), true), //sum(number)
ColumnType::new(CDT::timestamp_millisecond_datatype(), false), // window start
ColumnType::new(CDT::timestamp_millisecond_datatype(), false), // window end
ColumnType::new(CDT::uint64_datatype(), true), //sum(number)
])
.with_key(vec![1])
.with_time_index(Some(0))

View File

@@ -171,9 +171,13 @@ impl DfScalarFunction {
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct RawDfScalarFn {
/// The raw bytes encoded datafusion scalar function
/// The raw bytes encoded datafusion scalar function,
/// due to substrait have too many layers of nested struct and `ScalarFunction` 's derive is different
/// for simplicity's sake
/// so we store bytes instead of `ScalarFunction` here
/// but in unit test we will still compare decoded struct(using `f_decoded` field in Debug impl)
pub(crate) f: bytes::BytesMut,
/// The input schema of the function
pub(crate) input_schema: RelationDesc,
@@ -181,6 +185,17 @@ pub struct RawDfScalarFn {
pub(crate) extensions: FunctionExtensions,
}
impl std::fmt::Debug for RawDfScalarFn {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RawDfScalarFn")
.field("f", &self.f)
.field("f_decoded", &ScalarFunction::decode(&mut self.f.as_ref()))
.field("df_schema", &self.input_schema)
.field("extensions", &self.extensions)
.finish()
}
}
impl RawDfScalarFn {
pub fn from_proto(
f: &substrait::substrait_proto_df::proto::expression::ScalarFunction,

View File

@@ -16,19 +16,18 @@
use std::collections::HashMap;
use std::sync::{Arc, OnceLock};
use std::time::Duration;
use arrow::array::{ArrayRef, BooleanArray};
use common_error::ext::BoxedError;
use common_time::timestamp::TimeUnit;
use common_time::{DateTime, Timestamp};
use common_time::Timestamp;
use datafusion_expr::Operator;
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::DataType;
use datatypes::types::cast;
use datatypes::value::Value;
use datatypes::vectors::{
BooleanVector, DateTimeVector, Helper, TimestampMillisecondVector, VectorRef,
};
use datatypes::vectors::{BooleanVector, Helper, TimestampMillisecondVector, VectorRef};
use serde::{Deserialize, Serialize};
use smallvec::smallvec;
use snafu::{ensure, OptionExt, ResultExt};
@@ -52,8 +51,8 @@ pub enum UnmaterializableFunc {
CurrentSchema,
TumbleWindow {
ts: Box<TypedExpr>,
window_size: common_time::Interval,
start_time: Option<DateTime>,
window_size: Duration,
start_time: Option<Timestamp>,
},
}
@@ -63,7 +62,8 @@ impl UnmaterializableFunc {
match self {
Self::Now => Signature {
input: smallvec![],
output: ConcreteDataType::datetime_datatype(),
// TODO(yingwen): Maybe return timestamp.
output: ConcreteDataType::timestamp_millisecond_datatype(),
generic_fn: GenericFn::Now,
},
Self::CurrentSchema => Signature {
@@ -110,12 +110,12 @@ pub enum UnaryFunc {
StepTimestamp,
Cast(ConcreteDataType),
TumbleWindowFloor {
window_size: common_time::Interval,
start_time: Option<DateTime>,
window_size: Duration,
start_time: Option<Timestamp>,
},
TumbleWindowCeiling {
window_size: common_time::Interval,
start_time: Option<DateTime>,
window_size: Duration,
start_time: Option<Timestamp>,
},
}
@@ -139,8 +139,8 @@ impl UnaryFunc {
},
},
Self::StepTimestamp => Signature {
input: smallvec![ConcreteDataType::datetime_datatype()],
output: ConcreteDataType::datetime_datatype(),
input: smallvec![ConcreteDataType::timestamp_millisecond_datatype()],
output: ConcreteDataType::timestamp_millisecond_datatype(),
generic_fn: GenericFn::StepTimestamp,
},
Self::Cast(to) => Signature {
@@ -238,19 +238,19 @@ impl UnaryFunc {
}
}
Self::StepTimestamp => {
let datetime_array = get_datetime_array(&arg_col)?;
let date_array_ref = datetime_array
let timestamp_array = get_timestamp_array(&arg_col)?;
let timestamp_array_ref = timestamp_array
.as_any()
.downcast_ref::<arrow::array::Date64Array>()
.downcast_ref::<arrow::array::TimestampMillisecondArray>()
.context({
TypeMismatchSnafu {
expected: ConcreteDataType::boolean_datatype(),
actual: ConcreteDataType::from_arrow_type(datetime_array.data_type()),
actual: ConcreteDataType::from_arrow_type(timestamp_array.data_type()),
}
})?;
let ret = arrow::compute::unary(date_array_ref, |arr| arr + 1);
let ret = DateTimeVector::from(ret);
let ret = arrow::compute::unary(timestamp_array_ref, |arr| arr + 1);
let ret = TimestampMillisecondVector::from(ret);
Ok(Arc::new(ret))
}
Self::Cast(to) => {
@@ -266,19 +266,19 @@ impl UnaryFunc {
window_size,
start_time,
} => {
let datetime_array = get_datetime_array(&arg_col)?;
let date_array_ref = datetime_array
let timestamp_array = get_timestamp_array(&arg_col)?;
let date_array_ref = timestamp_array
.as_any()
.downcast_ref::<arrow::array::Date64Array>()
.downcast_ref::<arrow::array::TimestampMillisecondArray>()
.context({
TypeMismatchSnafu {
expected: ConcreteDataType::boolean_datatype(),
actual: ConcreteDataType::from_arrow_type(datetime_array.data_type()),
actual: ConcreteDataType::from_arrow_type(timestamp_array.data_type()),
}
})?;
let start_time = start_time.map(|t| t.val());
let window_size = (window_size.to_nanosecond() / 1_000_000) as repr::Duration; // nanosecond to millisecond
let start_time = start_time.map(|t| t.value());
let window_size = window_size.as_millis() as repr::Duration;
let ret = arrow::compute::unary(date_array_ref, |ts| {
get_window_start(ts, window_size, start_time)
@@ -291,19 +291,19 @@ impl UnaryFunc {
window_size,
start_time,
} => {
let datetime_array = get_datetime_array(&arg_col)?;
let date_array_ref = datetime_array
let timestamp_array = get_timestamp_array(&arg_col)?;
let date_array_ref = timestamp_array
.as_any()
.downcast_ref::<arrow::array::Date64Array>()
.downcast_ref::<arrow::array::TimestampMillisecondArray>()
.context({
TypeMismatchSnafu {
expected: ConcreteDataType::boolean_datatype(),
actual: ConcreteDataType::from_arrow_type(datetime_array.data_type()),
actual: ConcreteDataType::from_arrow_type(timestamp_array.data_type()),
}
})?;
let start_time = start_time.map(|t| t.val());
let window_size = (window_size.to_nanosecond() / 1_000_000) as repr::Duration; // nanosecond to millisecond
let start_time = start_time.map(|t| t.value());
let window_size = window_size.as_millis() as repr::Duration;
let ret = arrow::compute::unary(date_array_ref, |ts| {
get_window_start(ts, window_size, start_time) + window_size
@@ -330,19 +330,20 @@ impl UnaryFunc {
})?;
if let Some(window_size) = window_size_untyped.as_string() {
// cast as interval
cast(
let interval = cast(
Value::from(window_size),
&ConcreteDataType::interval_month_day_nano_datatype(),
&ConcreteDataType::interval_day_time_datatype(),
)
.map_err(BoxedError::new)
.context(ExternalSnafu)?
.as_interval()
.as_interval_day_time()
.context(UnexpectedSnafu {
reason: "Expect window size arg to be interval after successful cast"
.to_string(),
})?
} else if let Some(interval) = window_size_untyped.as_interval() {
interval
})?;
Duration::from_millis(interval.as_millis() as u64)
} else if let Some(interval) = window_size_untyped.as_interval_day_time() {
Duration::from_millis(interval.as_millis() as u64)
} else {
InvalidQuerySnafu {
reason: format!(
@@ -357,16 +358,19 @@ impl UnaryFunc {
let start_time = match args.get(2) {
Some(start_time) => {
if let Some(value) = start_time.expr.as_literal() {
// cast as DateTime
let ret = cast(value, &ConcreteDataType::datetime_datatype())
.map_err(BoxedError::new)
.context(ExternalSnafu)?
.as_datetime()
.context(UnexpectedSnafu {
reason:
"Expect start time arg to be datetime after successful cast"
.to_string(),
})?;
// cast as timestamp
let ret = cast(
value,
&ConcreteDataType::timestamp_millisecond_datatype(),
)
.map_err(BoxedError::new)
.context(ExternalSnafu)?
.as_timestamp()
.context(UnexpectedSnafu {
reason:
"Expect start time arg to be timestamp after successful cast"
.to_string(),
})?;
Some(ret)
} else {
UnexpectedSnafu {
@@ -446,15 +450,15 @@ impl UnaryFunc {
}
Self::StepTimestamp => {
let ty = arg.data_type();
if let Value::DateTime(datetime) = arg {
let datetime = DateTime::from(datetime.val() + 1);
Ok(Value::from(datetime))
if let Value::Timestamp(timestamp) = arg {
let timestamp = Timestamp::new_millisecond(timestamp.value() + 1);
Ok(Value::from(timestamp))
} else if let Ok(v) = value_to_internal_ts(arg) {
let datetime = DateTime::from(v + 1);
Ok(Value::from(datetime))
let timestamp = Timestamp::new_millisecond(v + 1);
Ok(Value::from(timestamp))
} else {
TypeMismatchSnafu {
expected: ConcreteDataType::datetime_datatype(),
expected: ConcreteDataType::timestamp_millisecond_datatype(),
actual: ty,
}
.fail()?
@@ -474,8 +478,8 @@ impl UnaryFunc {
start_time,
} => {
let ts = get_ts_as_millisecond(arg)?;
let start_time = start_time.map(|t| t.val());
let window_size = (window_size.to_nanosecond() / 1_000_000) as repr::Duration; // nanosecond to millisecond
let start_time = start_time.map(|t| t.value());
let window_size = window_size.as_millis() as repr::Duration;
let window_start = get_window_start(ts, window_size, start_time);
let ret = Timestamp::new_millisecond(window_start);
@@ -486,8 +490,8 @@ impl UnaryFunc {
start_time,
} => {
let ts = get_ts_as_millisecond(arg)?;
let start_time = start_time.map(|t| t.val());
let window_size = (window_size.to_nanosecond() / 1_000_000) as repr::Duration; // nanosecond to millisecond
let start_time = start_time.map(|t| t.value());
let window_size = window_size.as_millis() as repr::Duration;
let window_start = get_window_start(ts, window_size, start_time);
let window_end = window_start + window_size;
@@ -498,21 +502,22 @@ impl UnaryFunc {
}
}
fn get_datetime_array(vector: &VectorRef) -> Result<arrow::array::ArrayRef, EvalError> {
fn get_timestamp_array(vector: &VectorRef) -> Result<arrow::array::ArrayRef, EvalError> {
let arrow_array = vector.to_arrow_array();
let datetime_array =
if *arrow_array.data_type() == ConcreteDataType::datetime_datatype().as_arrow_type() {
arrow_array
} else {
arrow::compute::cast(
&arrow_array,
&ConcreteDataType::datetime_datatype().as_arrow_type(),
)
.context(ArrowSnafu {
context: "Trying to cast to datetime in StepTimestamp",
})?
};
Ok(datetime_array)
let timestamp_array = if *arrow_array.data_type()
== ConcreteDataType::timestamp_millisecond_datatype().as_arrow_type()
{
arrow_array
} else {
arrow::compute::cast(
&arrow_array,
&ConcreteDataType::timestamp_millisecond_datatype().as_arrow_type(),
)
.context(ArrowSnafu {
context: "Trying to cast to timestamp in StepTimestamp",
})?
};
Ok(timestamp_array)
}
fn get_window_start(
@@ -1284,7 +1289,6 @@ where
mod test {
use std::sync::Arc;
use common_time::Interval;
use datatypes::vectors::Vector;
use pretty_assertions::assert_eq;
@@ -1292,18 +1296,18 @@ mod test {
#[test]
fn test_tumble_batch() {
let datetime_vector = DateTimeVector::from_vec(vec![1, 2, 10, 13, 14, 20, 25]);
let timestamp_vector = TimestampMillisecondVector::from_vec(vec![1, 2, 10, 13, 14, 20, 25]);
let tumble_start = UnaryFunc::TumbleWindowFloor {
window_size: Interval::from_day_time(0, 10),
window_size: Duration::from_millis(10),
start_time: None,
};
let tumble_end = UnaryFunc::TumbleWindowCeiling {
window_size: Interval::from_day_time(0, 10),
window_size: Duration::from_millis(10),
start_time: None,
};
let len = datetime_vector.len();
let batch = Batch::try_new(vec![Arc::new(datetime_vector)], len).unwrap();
let len = timestamp_vector.len();
let batch = Batch::try_new(vec![Arc::new(timestamp_vector)], len).unwrap();
let arg = ScalarExpr::Column(0);
let start = tumble_start.eval_batch(&batch, &arg).unwrap();
@@ -1459,4 +1463,17 @@ mod test {
Err(Error::InvalidQuery { .. })
);
}
#[test]
fn test_cast_int() {
let interval = cast(
Value::from("1 second"),
&ConcreteDataType::interval_day_time_datatype(),
)
.unwrap();
assert_eq!(
interval,
Value::from(common_time::IntervalDayTime::new(0, 1000))
);
}
}

View File

@@ -61,7 +61,7 @@ pub const BATCH_SIZE: usize = 32 * 16384;
/// Convert a value that is or can be converted to Datetime to internal timestamp
///
/// support types are: `Date`, `DateTime`, `TimeStamp`, `i64`
pub fn value_to_internal_ts(value: Value) -> Result<Timestamp, EvalError> {
pub fn value_to_internal_ts(value: Value) -> Result<i64, EvalError> {
let is_supported_time_type = |arg: &Value| {
let ty = arg.data_type();
matches!(
@@ -76,14 +76,14 @@ pub fn value_to_internal_ts(value: Value) -> Result<Timestamp, EvalError> {
Value::Int64(ts) => Ok(ts),
arg if is_supported_time_type(&arg) => {
let arg_ty = arg.data_type();
let res = cast(arg, &ConcreteDataType::datetime_datatype()).context({
let res = cast(arg, &ConcreteDataType::timestamp_millisecond_datatype()).context({
CastValueSnafu {
from: arg_ty,
to: ConcreteDataType::datetime_datatype(),
to: ConcreteDataType::timestamp_millisecond_datatype(),
}
})?;
if let Value::DateTime(ts) = res {
Ok(ts.val())
if let Value::Timestamp(ts) = res {
Ok(ts.value())
} else {
unreachable!()
}

View File

@@ -156,10 +156,10 @@ mod test {
use catalog::RegisterTableRequest;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, NUMBERS_TABLE_ID};
use common_time::DateTime;
use datatypes::prelude::*;
use datatypes::schema::Schema;
use datatypes::vectors::VectorRef;
use datatypes::timestamp::TimestampMillisecond;
use datatypes::vectors::{TimestampMillisecondVectorBuilder, VectorRef};
use itertools::Itertools;
use prost::Message;
use query::parser::QueryLanguageParser;
@@ -202,7 +202,7 @@ mod test {
];
let schema = RelationType::new(vec![
ColumnType::new(CDT::uint32_datatype(), false),
ColumnType::new(CDT::datetime_datatype(), false),
ColumnType::new(CDT::timestamp_millisecond_datatype(), false),
]);
schemas.insert(
gid,
@@ -232,7 +232,11 @@ mod test {
let schema = vec![
datatypes::schema::ColumnSchema::new("number", CDT::uint32_datatype(), false),
datatypes::schema::ColumnSchema::new("ts", CDT::datetime_datatype(), false),
datatypes::schema::ColumnSchema::new(
"ts",
CDT::timestamp_millisecond_datatype(),
false,
),
];
let mut columns = vec![];
let numbers = (1..=10).collect_vec();
@@ -240,7 +244,11 @@ mod test {
columns.push(column);
let ts = (1..=10).collect_vec();
let column: VectorRef = Arc::new(<DateTime as Scalar>::VectorType::from_vec(ts));
let mut builder = TimestampMillisecondVectorBuilder::with_capacity(10);
ts.into_iter()
.map(|v| builder.push(Some(TimestampMillisecond::new(v))))
.count();
let column: VectorRef = builder.to_vector_cloned();
columns.push(column);
let schema = Arc::new(Schema::new(schema));

View File

@@ -345,9 +345,10 @@ impl TypedPlan {
#[cfg(test)]
mod test {
use std::collections::BTreeMap;
use std::time::Duration;
use bytes::BytesMut;
use common_time::{DateTime, Interval};
use common_time::{IntervalMonthDayNano, Timestamp};
use datatypes::prelude::ConcreteDataType;
use datatypes::value::Value;
use pretty_assertions::assert_eq;
@@ -398,7 +399,10 @@ mod test {
.with_types(
RelationType::new(vec![
ColumnType::new(ConcreteDataType::uint32_datatype(), false),
ColumnType::new(ConcreteDataType::datetime_datatype(), false),
ColumnType::new(
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
])
.into_named(vec![
Some("number".to_string()),
@@ -413,22 +417,18 @@ mod test {
.map(vec![
ScalarExpr::Column(1).call_unary(
UnaryFunc::TumbleWindowFloor {
window_size: Interval::from_month_day_nano(
0,
0,
1_000_000_000,
),
start_time: Some(DateTime::new(1625097600000)),
window_size: Duration::from_nanos(1_000_000_000),
start_time: Some(Timestamp::new_millisecond(
1625097600000,
)),
},
),
ScalarExpr::Column(1).call_unary(
UnaryFunc::TumbleWindowCeiling {
window_size: Interval::from_month_day_nano(
0,
0,
1_000_000_000,
),
start_time: Some(DateTime::new(1625097600000)),
window_size: Duration::from_nanos(1_000_000_000),
start_time: Some(Timestamp::new_millisecond(
1625097600000,
)),
},
),
])
@@ -539,7 +539,10 @@ mod test {
.with_types(
RelationType::new(vec![
ColumnType::new(ConcreteDataType::uint32_datatype(), false),
ColumnType::new(ConcreteDataType::datetime_datatype(), false),
ColumnType::new(
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
])
.into_named(vec![
Some("number".to_string()),
@@ -554,22 +557,18 @@ mod test {
.map(vec![
ScalarExpr::Column(1).call_unary(
UnaryFunc::TumbleWindowFloor {
window_size: Interval::from_month_day_nano(
0,
0,
1_000_000_000,
),
start_time: Some(DateTime::new(1625097600000)),
window_size: Duration::from_nanos(1_000_000_000),
start_time: Some(Timestamp::new_millisecond(
1625097600000,
)),
},
),
ScalarExpr::Column(1).call_unary(
UnaryFunc::TumbleWindowCeiling {
window_size: Interval::from_month_day_nano(
0,
0,
1_000_000_000,
),
start_time: Some(DateTime::new(1625097600000)),
window_size: Duration::from_nanos(1_000_000_000),
start_time: Some(Timestamp::new_millisecond(
1625097600000,
)),
},
),
])
@@ -686,7 +685,10 @@ mod test {
.with_types(
RelationType::new(vec![
ColumnType::new(ConcreteDataType::uint32_datatype(), false),
ColumnType::new(ConcreteDataType::datetime_datatype(), false),
ColumnType::new(
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
])
.into_named(vec![
Some("number".to_string()),
@@ -701,21 +703,13 @@ mod test {
.map(vec![
ScalarExpr::Column(1).call_unary(
UnaryFunc::TumbleWindowFloor {
window_size: Interval::from_month_day_nano(
0,
0,
3_600_000_000_000,
),
window_size: Duration::from_nanos(3_600_000_000_000),
start_time: None,
},
),
ScalarExpr::Column(1).call_unary(
UnaryFunc::TumbleWindowCeiling {
window_size: Interval::from_month_day_nano(
0,
0,
3_600_000_000_000,
),
window_size: Duration::from_nanos(3_600_000_000_000),
start_time: None,
},
),
@@ -833,7 +827,10 @@ mod test {
.with_types(
RelationType::new(vec![
ColumnType::new(ConcreteDataType::uint32_datatype(), false),
ColumnType::new(ConcreteDataType::datetime_datatype(), false),
ColumnType::new(
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
])
.into_named(vec![
Some("number".to_string()),
@@ -848,21 +845,13 @@ mod test {
.map(vec![
ScalarExpr::Column(1).call_unary(
UnaryFunc::TumbleWindowFloor {
window_size: Interval::from_month_day_nano(
0,
0,
3_600_000_000_000,
),
window_size: Duration::from_nanos(3_600_000_000_000),
start_time: None,
},
),
ScalarExpr::Column(1).call_unary(
UnaryFunc::TumbleWindowCeiling {
window_size: Interval::from_month_day_nano(
0,
0,
3_600_000_000_000,
),
window_size: Duration::from_nanos(3_600_000_000_000),
start_time: None,
},
),
@@ -948,7 +937,10 @@ mod test {
.with_types(
RelationType::new(vec![
ColumnType::new(ConcreteDataType::uint32_datatype(), false),
ColumnType::new(ConcreteDataType::datetime_datatype(), false),
ColumnType::new(
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
])
.into_named(vec![
Some("number".to_string()),
@@ -963,22 +955,18 @@ mod test {
.map(vec![
ScalarExpr::Column(1).call_unary(
UnaryFunc::TumbleWindowFloor {
window_size: Interval::from_month_day_nano(
0,
0,
3_600_000_000_000,
),
start_time: Some(DateTime::new(1625097600000)),
window_size: Duration::from_nanos(3_600_000_000_000),
start_time: Some(Timestamp::new_millisecond(
1625097600000,
)),
},
),
ScalarExpr::Column(1).call_unary(
UnaryFunc::TumbleWindowCeiling {
window_size: Interval::from_month_day_nano(
0,
0,
3_600_000_000_000,
),
start_time: Some(DateTime::new(1625097600000)),
window_size: Duration::from_nanos(3_600_000_000_000),
start_time: Some(Timestamp::new_millisecond(
1625097600000,
)),
},
),
])
@@ -1512,7 +1500,7 @@ mod test {
.with_types(
RelationType::new(vec![
ColumnType::new(ConcreteDataType::uint32_datatype(), false),
ColumnType::new(ConcreteDataType::datetime_datatype(), false),
ColumnType::new(ConcreteDataType::timestamp_millisecond_datatype(), false),
])
.into_named(vec![
Some("number".to_string()),
@@ -1536,7 +1524,7 @@ mod test {
true,
),ColumnType::new(
ConcreteDataType::timestamp_millisecond_datatype(),
true,
false,
)])
.into_unnamed(),
extensions: FunctionExtensions {
@@ -1554,10 +1542,10 @@ mod test {
.unwrap(),
exprs: vec![
ScalarExpr::Literal(
Value::Interval(Interval::from_month_day_nano(0, 0, 30000000000)),
Value::IntervalMonthDayNano(IntervalMonthDayNano::new(0, 0, 30000000000)),
CDT::interval_month_day_nano_datatype()
),
ScalarExpr::Column(1).cast(CDT::timestamp_millisecond_datatype())
ScalarExpr::Column(1)
],
}])
.unwrap()

View File

@@ -178,14 +178,14 @@ pub(crate) fn from_substrait_literal(lit: &Literal) -> Result<(Value, CDT), Erro
let (days, seconds, microseconds) =
(interval.days, interval.seconds, interval.microseconds);
let millis = microseconds / 1000 + seconds * 1000;
let value_interval = common_time::Interval::from_day_time(days, millis);
let value_interval = common_time::IntervalDayTime::new(days, millis);
(
Value::Interval(value_interval),
Value::IntervalDayTime(value_interval),
CDT::interval_day_time_datatype(),
)
}
Some(LiteralType::IntervalYearToMonth(interval)) => (
Value::Interval(common_time::Interval::from_year_month(
Value::IntervalYearMonth(common_time::IntervalYearMonth::new(
interval.years * 12 + interval.months,
)),
CDT::interval_year_month_datatype(),
@@ -239,9 +239,9 @@ fn from_substrait_user_defined_type(user_defined: &UserDefined) -> Result<(Value
}
);
let i: i32 = from_bytes(&val.value)?;
let value_interval = common_time::Interval::from_year_month(i);
let value_interval = common_time::IntervalYearMonth::new(i);
(
Value::Interval(value_interval),
Value::IntervalYearMonth(value_interval),
CDT::interval_year_month_datatype(),
)
}
@@ -255,12 +255,12 @@ fn from_substrait_user_defined_type(user_defined: &UserDefined) -> Result<(Value
)
}
);
// TODO(yingwen): Datafusion may change the representation of the interval type.
let i: i128 = from_bytes(&val.value)?;
let (months, days, nsecs) = ((i >> 96) as i32, (i >> 64) as i32, i as i64);
let value_interval =
common_time::Interval::from_month_day_nano(months, days, nsecs);
let value_interval = common_time::IntervalMonthDayNano::new(months, days, nsecs);
(
Value::Interval(value_interval),
Value::IntervalMonthDayNano(value_interval),
CDT::interval_month_day_nano_datatype(),
)
}
@@ -274,11 +274,12 @@ fn from_substrait_user_defined_type(user_defined: &UserDefined) -> Result<(Value
)
}
);
// TODO(yingwen): Datafusion may change the representation of the interval type.
let i: i64 = from_bytes(&val.value)?;
let (days, millis) = ((i >> 32) as i32, i as i32);
let value_interval = common_time::Interval::from_day_time(days, millis);
let value_interval = common_time::IntervalDayTime::new(days, millis);
(
Value::Interval(value_interval),
Value::IntervalDayTime(value_interval),
CDT::interval_day_time_datatype(),
)
}

View File

@@ -16,9 +16,10 @@ use bytes::Buf;
use common_base::bytes::Bytes;
use common_decimal::Decimal128;
use common_time::time::Time;
use common_time::{Date, Duration, Interval};
use common_time::{Date, Duration, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::Value;
use datatypes::types::IntervalType;
use datatypes::value::ValueRef;
use memcomparable::{Deserializer, Serializer};
use paste::paste;
@@ -117,6 +118,24 @@ impl SortField {
.serialize($serializer)
.context(SerializeFieldSnafu)?;
}
ConcreteDataType::Interval(IntervalType::YearMonth(_)) => {
let interval = value.as_interval_year_month().context(FieldTypeMismatchSnafu)?;
interval.map(|i| i.to_i32())
.serialize($serializer)
.context(SerializeFieldSnafu)?;
}
ConcreteDataType::Interval(IntervalType::DayTime(_)) => {
let interval = value.as_interval_day_time().context(FieldTypeMismatchSnafu)?;
interval.map(|i| i.to_i64())
.serialize($serializer)
.context(SerializeFieldSnafu)?;
}
ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => {
let interval = value.as_interval_month_day_nano().context(FieldTypeMismatchSnafu)?;
interval.map(|i| i.to_i128())
.serialize($serializer)
.context(SerializeFieldSnafu)?;
}
ConcreteDataType::List(_) |
ConcreteDataType::Dictionary(_) |
ConcreteDataType::Null(_) => {
@@ -144,7 +163,6 @@ impl SortField {
Date, date,
DateTime, datetime,
Time, time,
Interval, interval,
Duration, duration,
Decimal128, decimal128,
Json, binary
@@ -181,6 +199,24 @@ impl SortField {
.map(|t|ty.create_timestamp(t));
Ok(Value::from(timestamp))
}
ConcreteDataType::Interval(IntervalType::YearMonth(_)) => {
let interval = Option::<i32>::deserialize(deserializer)
.context(error::DeserializeFieldSnafu)?
.map(IntervalYearMonth::from_i32);
Ok(Value::from(interval))
}
ConcreteDataType::Interval(IntervalType::DayTime(_)) => {
let interval = Option::<i64>::deserialize(deserializer)
.context(error::DeserializeFieldSnafu)?
.map(IntervalDayTime::from_i64);
Ok(Value::from(interval))
}
ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => {
let interval = Option::<i128>::deserialize(deserializer)
.context(error::DeserializeFieldSnafu)?
.map(IntervalMonthDayNano::from_i128);
Ok(Value::from(interval))
}
ConcreteDataType::List(l) => NotSupportedFieldSnafu {
data_type: ConcreteDataType::List(l.clone()),
}
@@ -212,7 +248,6 @@ impl SortField {
Date, Date,
Time, Time,
DateTime, DateTime,
Interval, Interval,
Duration, Duration,
Decimal128, Decimal128
)
@@ -263,7 +298,9 @@ impl SortField {
ConcreteDataType::Timestamp(_) => 9, // We treat timestamp as Option<i64>
ConcreteDataType::Time(_) => 10, // i64 and 1 byte time unit
ConcreteDataType::Duration(_) => 10,
ConcreteDataType::Interval(_) => 18,
ConcreteDataType::Interval(IntervalType::YearMonth(_)) => 5,
ConcreteDataType::Interval(IntervalType::DayTime(_)) => 9,
ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => 17,
ConcreteDataType::Decimal128(_) => 19,
ConcreteDataType::Null(_)
| ConcreteDataType::List(_)
@@ -387,7 +424,9 @@ impl RowCodec for McmpRowCodec {
#[cfg(test)]
mod tests {
use common_base::bytes::StringBytes;
use common_time::{DateTime, Timestamp};
use common_time::{
DateTime, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp,
};
use datatypes::value::Value;
use super::*;
@@ -563,6 +602,8 @@ mod tests {
ConcreteDataType::timestamp_millisecond_datatype(),
ConcreteDataType::time_millisecond_datatype(),
ConcreteDataType::duration_millisecond_datatype(),
ConcreteDataType::interval_year_month_datatype(),
ConcreteDataType::interval_day_time_datatype(),
ConcreteDataType::interval_month_day_nano_datatype(),
ConcreteDataType::decimal128_default_datatype(),
],
@@ -585,7 +626,9 @@ mod tests {
Value::Timestamp(Timestamp::new_millisecond(12)),
Value::Time(Time::new_millisecond(13)),
Value::Duration(Duration::new_millisecond(14)),
Value::Interval(Interval::from_month_day_nano(1, 1, 15)),
Value::IntervalYearMonth(IntervalYearMonth::new(1)),
Value::IntervalDayTime(IntervalDayTime::new(1, 15)),
Value::IntervalMonthDayNano(IntervalMonthDayNano::new(1, 1, 15)),
Value::Decimal128(Decimal128::from(16)),
],
);

View File

@@ -20,9 +20,9 @@ use arrow_schema::DataType;
use async_recursion::async_recursion;
use catalog::table_source::DfTableSourceProvider;
use chrono::Utc;
use common_time::interval::NANOS_PER_MILLI;
use common_time::interval::{MS_PER_DAY, NANOS_PER_MILLI};
use common_time::timestamp::TimeUnit;
use common_time::{Interval, Timestamp, Timezone};
use common_time::{IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp, Timezone};
use datafusion::datasource::DefaultTableSource;
use datafusion::prelude::Column;
use datafusion::scalar::ScalarValue;
@@ -145,8 +145,6 @@ fn evaluate_expr_to_millisecond(args: &[Expr], i: usize, interval_only: bool) ->
}
let execution_props = ExecutionProps::new().with_query_execution_start_time(Utc::now());
let info = SimplifyContext::new(&execution_props).with_schema(Arc::new(DFSchema::empty()));
let interval_to_ms =
|interval: Interval| -> i64 { (interval.to_nanosecond() / NANOS_PER_MILLI as i128) as i64 };
let simplify_expr = ExprSimplifier::new(info).simplify(expr.clone())?;
match simplify_expr {
Expr::Literal(ScalarValue::TimestampNanosecond(ts_nanos, _))
@@ -161,15 +159,37 @@ fn evaluate_expr_to_millisecond(args: &[Expr], i: usize, interval_only: bool) ->
| Expr::Literal(ScalarValue::DurationMillisecond(ts_millis)) => ts_millis,
Expr::Literal(ScalarValue::TimestampSecond(ts_secs, _))
| Expr::Literal(ScalarValue::DurationSecond(ts_secs)) => ts_secs.map(|v| v * 1_000),
Expr::Literal(ScalarValue::IntervalYearMonth(interval)) => {
interval.map(|v| interval_to_ms(Interval::from_i32(v)))
}
Expr::Literal(ScalarValue::IntervalDayTime(interval)) => {
interval.map(|v| interval_to_ms(Interval::from_i64(v)))
}
Expr::Literal(ScalarValue::IntervalMonthDayNano(interval)) => {
interval.map(|v| interval_to_ms(Interval::from_i128(v)))
}
// We don't support interval with months as days in a month is unclear.
Expr::Literal(ScalarValue::IntervalYearMonth(interval)) => interval
.map(|v| {
let interval = IntervalYearMonth::from_i32(v);
if interval.months != 0 {
return Err(DataFusionError::Plan(format!(
"Year or month interval is not allowed in range query: {}",
expr.display_name().unwrap_or_default()
)));
}
Ok(0)
})
.transpose()?,
Expr::Literal(ScalarValue::IntervalDayTime(interval)) => interval.map(|v| {
let interval = IntervalDayTime::from_i64(v);
interval.as_millis()
}),
Expr::Literal(ScalarValue::IntervalMonthDayNano(interval)) => interval
.map(|v| {
let interval = IntervalMonthDayNano::from_i128(v);
if interval.months != 0 {
return Err(DataFusionError::Plan(format!(
"Year or month interval is not allowed in range query: {}",
expr.display_name().unwrap_or_default()
)));
}
Ok(interval.days as i64 * MS_PER_DAY + interval.nanoseconds / NANOS_PER_MILLI)
})
.transpose()?,
_ => None,
}
.ok_or_else(|| {
@@ -547,6 +567,7 @@ mod test {
use catalog::memory::MemoryCatalogManager;
use catalog::RegisterTableRequest;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_time::IntervalYearMonth;
use datafusion_expr::{BinaryExpr, Operator};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
@@ -788,35 +809,29 @@ mod test {
#[test]
fn test_parse_duration_expr() {
let interval_to_ms = |interval: Interval| -> u128 {
(interval.to_nanosecond() / NANOS_PER_MILLI as i128) as u128
};
// test IntervalYearMonth
let interval = Interval::from_year_month(10);
let interval = IntervalYearMonth::new(10);
let args = vec![Expr::Literal(ScalarValue::IntervalYearMonth(Some(
interval.to_i32(),
)))];
assert_eq!(
parse_duration_expr(&args, 0).unwrap().as_millis(),
interval_to_ms(interval)
);
assert!(parse_duration_expr(&args, 0).is_err(),);
// test IntervalDayTime
let interval = Interval::from_day_time(10, 10);
let interval = IntervalDayTime::new(10, 10);
let args = vec![Expr::Literal(ScalarValue::IntervalDayTime(Some(
interval.to_i64(),
)))];
assert_eq!(
parse_duration_expr(&args, 0).unwrap().as_millis(),
interval_to_ms(interval)
parse_duration_expr(&args, 0).unwrap().as_millis() as i64,
interval.as_millis()
);
// test IntervalMonthDayNano
let interval = Interval::from_month_day_nano(10, 10, 10);
let interval = IntervalMonthDayNano::new(0, 10, 10);
let args = vec![Expr::Literal(ScalarValue::IntervalMonthDayNano(Some(
interval.to_i128(),
)))];
assert_eq!(
parse_duration_expr(&args, 0).unwrap().as_millis(),
interval_to_ms(interval)
parse_duration_expr(&args, 0).unwrap().as_millis() as i64,
interval.days as i64 * MS_PER_DAY + interval.nanoseconds / NANOS_PER_MILLI,
);
// test Duration
let args = vec![Expr::Literal(ScalarValue::Utf8(Some("1y4w".into())))];
@@ -828,25 +843,25 @@ mod test {
assert!(parse_duration_expr(&args, 10).is_err());
// test evaluate expr
let args = vec![Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some(
Interval::from_year_month(10).to_i32(),
left: Box::new(Expr::Literal(ScalarValue::IntervalDayTime(Some(
IntervalDayTime::new(0, 10).to_i64(),
)))),
op: Operator::Plus,
right: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some(
Interval::from_year_month(10).to_i32(),
right: Box::new(Expr::Literal(ScalarValue::IntervalDayTime(Some(
IntervalDayTime::new(0, 10).to_i64(),
)))),
})];
assert_eq!(
parse_duration_expr(&args, 0).unwrap().as_millis(),
interval_to_ms(Interval::from_year_month(20))
parse_duration_expr(&args, 0).unwrap(),
Duration::from_millis(20)
);
let args = vec![Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some(
Interval::from_year_month(10).to_i32(),
left: Box::new(Expr::Literal(ScalarValue::IntervalDayTime(Some(
IntervalDayTime::new(0, 10).to_i64(),
)))),
op: Operator::Minus,
right: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some(
Interval::from_year_month(10).to_i32(),
right: Box::new(Expr::Literal(ScalarValue::IntervalDayTime(Some(
IntervalDayTime::new(0, 10).to_i64(),
)))),
})];
// test zero interval error
@@ -854,7 +869,7 @@ mod test {
// test must all be interval
let args = vec![Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some(
Interval::from_year_month(10).to_i32(),
IntervalYearMonth::new(10).to_i32(),
)))),
op: Operator::Minus,
right: Box::new(Expr::Literal(ScalarValue::Time64Microsecond(Some(0)))),
@@ -907,19 +922,15 @@ mod test {
);
// test evaluate expr
let args = vec![Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some(
Interval::from_year_month(10).to_i32(),
left: Box::new(Expr::Literal(ScalarValue::IntervalDayTime(Some(
IntervalDayTime::new(0, 10).to_i64(),
)))),
op: Operator::Plus,
right: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some(
Interval::from_year_month(10).to_i32(),
right: Box::new(Expr::Literal(ScalarValue::IntervalDayTime(Some(
IntervalDayTime::new(0, 10).to_i64(),
)))),
})];
assert_eq!(
parse_align_to(&args, 0, None).unwrap(),
// 20 month
20 * 30 * 24 * 60 * 60 * 1000
);
assert_eq!(parse_align_to(&args, 0, None).unwrap(), 20);
}
#[test]
@@ -927,18 +938,18 @@ mod test {
let expr = Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Literal(ScalarValue::DurationMillisecond(Some(20)))),
op: Operator::Minus,
right: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some(
Interval::from_year_month(10).to_i32(),
right: Box::new(Expr::Literal(ScalarValue::IntervalDayTime(Some(
IntervalDayTime::new(10, 0).to_i64(),
)))),
});
assert!(!interval_only_in_expr(&expr));
let expr = Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some(
Interval::from_year_month(10).to_i32(),
left: Box::new(Expr::Literal(ScalarValue::IntervalDayTime(Some(
IntervalDayTime::new(10, 0).to_i64(),
)))),
op: Operator::Minus,
right: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some(
Interval::from_year_month(10).to_i32(),
right: Box::new(Expr::Literal(ScalarValue::IntervalDayTime(Some(
IntervalDayTime::new(10, 0).to_i64(),
)))),
});
assert!(interval_only_in_expr(&expr));

View File

@@ -226,7 +226,11 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
Value::Timestamp(v) => row_writer.write_col(
v.to_chrono_datetime_with_timezone(Some(&query_context.timezone())),
)?,
Value::Interval(v) => row_writer.write_col(v.to_iso8601_string())?,
Value::IntervalYearMonth(v) => row_writer.write_col(v.to_iso8601_string())?,
Value::IntervalDayTime(v) => row_writer.write_col(v.to_iso8601_string())?,
Value::IntervalMonthDayNano(v) => {
row_writer.write_col(v.to_iso8601_string())?
}
Value::Duration(v) => row_writer.write_col(v.to_std_duration())?,
Value::List(_) => {
return Err(Error::Internal {

View File

@@ -21,12 +21,12 @@ use std::collections::HashMap;
use std::ops::Deref;
use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
use common_time::Interval;
use common_time::{IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
use datafusion_common::ScalarValue;
use datafusion_expr::LogicalPlan;
use datatypes::prelude::{ConcreteDataType, Value};
use datatypes::schema::Schema;
use datatypes::types::TimestampType;
use datatypes::types::{IntervalType, TimestampType};
use datatypes::value::ListValue;
use pgwire::api::portal::{Format, Portal};
use pgwire::api::results::{DataRowEncoder, FieldInfo};
@@ -325,7 +325,9 @@ fn encode_array(
.iter()
.map(|v| match v {
Value::Null => Ok(None),
Value::Interval(v) => Ok(Some(PgInterval::from(*v))),
Value::IntervalYearMonth(v) => Ok(Some(PgInterval::from(*v))),
Value::IntervalDayTime(v) => Ok(Some(PgInterval::from(*v))),
Value::IntervalMonthDayNano(v) => Ok(Some(PgInterval::from(*v))),
_ => Err(PgWireError::ApiError(Box::new(Error::Internal {
err_msg: format!("Invalid list item type, find {v:?}, expected interval",),
}))),
@@ -443,7 +445,9 @@ pub(super) fn encode_value(
})))
}
}
Value::Interval(v) => builder.encode_field(&PgInterval::from(*v)),
Value::IntervalYearMonth(v) => builder.encode_field(&PgInterval::from(*v)),
Value::IntervalDayTime(v) => builder.encode_field(&PgInterval::from(*v)),
Value::IntervalMonthDayNano(v) => builder.encode_field(&PgInterval::from(*v)),
Value::Decimal128(v) => builder.encode_field(&v.to_string()),
Value::List(values) => encode_array(query_ctx, values, builder),
Value::Duration(_) => Err(PgWireError::ApiError(Box::new(Error::Internal {
@@ -875,9 +879,51 @@ pub(super) fn parameters_to_scalar_values(
let data = portal.parameter::<PgInterval>(idx, &client_type)?;
if let Some(server_type) = &server_type {
match server_type {
ConcreteDataType::Interval(_) => ScalarValue::IntervalMonthDayNano(
data.map(|i| Interval::from(i).to_i128()),
),
ConcreteDataType::Interval(IntervalType::YearMonth(_)) => {
ScalarValue::IntervalYearMonth(
data.map(|i| {
if i.days != 0 || i.microseconds != 0 {
Err(invalid_parameter_error(
"invalid_parameter_type",
Some(format!(
"Expected: {}, found: {}",
server_type, client_type
)),
))
} else {
Ok(IntervalYearMonth::new(i.months).to_i32())
}
})
.transpose()?,
)
}
ConcreteDataType::Interval(IntervalType::DayTime(_)) => {
ScalarValue::IntervalDayTime(
data.map(|i| {
if i.months != 0 || i.microseconds % 1000 != 0 {
Err(invalid_parameter_error(
"invalid_parameter_type",
Some(format!(
"Expected: {}, found: {}",
server_type, client_type
)),
))
} else {
Ok(IntervalDayTime::new(
i.days,
(i.microseconds / 1000) as i32,
)
.to_i64())
}
})
.transpose()?,
)
}
ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => {
ScalarValue::IntervalMonthDayNano(
data.map(|i| IntervalMonthDayNano::from(i).to_i128()),
)
}
_ => {
return Err(invalid_parameter_error(
"invalid_parameter_type",
@@ -886,7 +932,9 @@ pub(super) fn parameters_to_scalar_values(
}
}
} else {
ScalarValue::IntervalMonthDayNano(data.map(|i| Interval::from(i).to_i128()))
ScalarValue::IntervalMonthDayNano(
data.map(|i| IntervalMonthDayNano::from(i).to_i128()),
)
}
}
&Type::BYTEA => {
@@ -1150,7 +1198,21 @@ mod test {
FieldFormat::Text,
),
FieldInfo::new(
"intervals".into(),
"interval_year_month".into(),
None,
None,
Type::INTERVAL,
FieldFormat::Text,
),
FieldInfo::new(
"interval_day_time".into(),
None,
None,
Type::INTERVAL,
FieldFormat::Text,
),
FieldInfo::new(
"interval_month_day_nano".into(),
None,
None,
Type::INTERVAL,
@@ -1214,6 +1276,8 @@ mod test {
ConcreteDataType::datetime_datatype(),
ConcreteDataType::timestamp_datatype(TimeUnit::Second),
ConcreteDataType::interval_datatype(IntervalUnit::YearMonth),
ConcreteDataType::interval_datatype(IntervalUnit::DayTime),
ConcreteDataType::interval_datatype(IntervalUnit::MonthDayNano),
ConcreteDataType::list_datatype(ConcreteDataType::int64_datatype()),
ConcreteDataType::list_datatype(ConcreteDataType::float64_datatype()),
ConcreteDataType::list_datatype(ConcreteDataType::string_datatype()),
@@ -1246,7 +1310,9 @@ mod test {
Value::Time(1001i64.into()),
Value::DateTime(1000001i64.into()),
Value::Timestamp(1000001i64.into()),
Value::Interval(1000001i128.into()),
Value::IntervalYearMonth(IntervalYearMonth::new(1)),
Value::IntervalDayTime(IntervalDayTime::new(1, 10)),
Value::IntervalMonthDayNano(IntervalMonthDayNano::new(1, 1, 10)),
Value::List(ListValue::new(
vec![Value::Int64(1i64)],
ConcreteDataType::int64_datatype(),

View File

@@ -15,31 +15,51 @@
use std::fmt::Display;
use bytes::{Buf, BufMut};
use common_time::Interval;
use common_time::interval::IntervalFormat;
use common_time::{IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
use pgwire::types::ToSqlText;
use postgres_types::{to_sql_checked, FromSql, IsNull, ToSql, Type};
#[derive(Debug, Clone, Copy, Default)]
pub struct PgInterval {
months: i32,
days: i32,
microseconds: i64,
pub(crate) months: i32,
pub(crate) days: i32,
pub(crate) microseconds: i64,
}
impl From<Interval> for PgInterval {
fn from(interval: Interval) -> Self {
let (months, days, nanos) = interval.to_month_day_nano();
impl From<IntervalYearMonth> for PgInterval {
fn from(interval: IntervalYearMonth) -> Self {
Self {
months,
days,
microseconds: nanos / 1000,
months: interval.months,
days: 0,
microseconds: 0,
}
}
}
impl From<PgInterval> for Interval {
impl From<IntervalDayTime> for PgInterval {
fn from(interval: IntervalDayTime) -> Self {
Self {
months: 0,
days: interval.days,
microseconds: interval.milliseconds as i64 * 1000,
}
}
}
impl From<IntervalMonthDayNano> for PgInterval {
fn from(interval: IntervalMonthDayNano) -> Self {
Self {
months: interval.months,
days: interval.days,
microseconds: interval.nanoseconds / 1000,
}
}
}
impl From<PgInterval> for IntervalMonthDayNano {
fn from(interval: PgInterval) -> Self {
Interval::from_month_day_nano(
IntervalMonthDayNano::new(
interval.months,
interval.days,
// Maybe overflow, but most scenarios ok.
@@ -56,7 +76,11 @@ impl From<PgInterval> for Interval {
impl Display for PgInterval {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", Interval::from(*self).to_postgres_string())
write!(
f,
"{}",
IntervalFormat::from(IntervalMonthDayNano::from(*self)).to_postgres_string()
)
}
}

View File

@@ -322,7 +322,9 @@ pub fn sql_value_to_value(
| Value::Timestamp(_)
| Value::Time(_)
| Value::Duration(_)
| Value::Interval(_) => match unary_op {
| Value::IntervalYearMonth(_)
| Value::IntervalDayTime(_)
| Value::IntervalMonthDayNano(_) => match unary_op {
UnaryOperator::Plus => {}
UnaryOperator::Minus => {
value = value

View File

@@ -20,25 +20,18 @@ Affected Rows: 8
SELECT ts, host, min(val) RANGE (INTERVAL '1 year') FROM host ALIGN (INTERVAL '1 year') ORDER BY host, ts;
+---------------------+-------+----------------------------------------------------------------------------+
| ts | host | MIN(host.val) RANGE IntervalMonthDayNano("950737950171172051122527404032") |
+---------------------+-------+----------------------------------------------------------------------------+
| 1970-01-01T00:00:00 | host1 | 0 |
| 1970-12-27T00:00:00 | host1 | 2 |
| 1970-01-01T00:00:00 | host2 | 4 |
| 1970-12-27T00:00:00 | host2 | 6 |
+---------------------+-------+----------------------------------------------------------------------------+
Error: 3000(PlanQuery), DataFusion error: Error during planning: Year or month interval is not allowed in range query: IntervalMonthDayNano("950737950171172051122527404032")
SELECT ts, host, min(val) RANGE (INTERVAL '1' year) FROM host ALIGN (INTERVAL '1' year) ORDER BY host, ts;
SELECT ts, host, min(val) RANGE (INTERVAL '1' day) FROM host ALIGN (INTERVAL '1' day) ORDER BY host, ts;
+---------------------+-------+----------------------------------------------------------------------------+
| ts | host | MIN(host.val) RANGE IntervalMonthDayNano("950737950171172051122527404032") |
+---------------------+-------+----------------------------------------------------------------------------+
| 1970-01-01T00:00:00 | host1 | 0 |
| 1970-12-27T00:00:00 | host1 | 2 |
| 1970-01-01T00:00:00 | host2 | 4 |
| 1970-12-27T00:00:00 | host2 | 6 |
+---------------------+-------+----------------------------------------------------------------------------+
+---------------------+-------+------------------------------------------------------------------+
| ts | host | MIN(host.val) RANGE IntervalMonthDayNano("18446744073709551616") |
+---------------------+-------+------------------------------------------------------------------+
| 1970-01-01T00:00:00 | host1 | 0 |
| 1971-01-02T00:00:00 | host1 | 2 |
| 1970-01-01T00:00:00 | host2 | 4 |
| 1971-01-02T00:00:00 | host2 | 6 |
+---------------------+-------+------------------------------------------------------------------+
DROP TABLE host;

View File

@@ -16,6 +16,6 @@ INSERT INTO TABLE host VALUES
SELECT ts, host, min(val) RANGE (INTERVAL '1 year') FROM host ALIGN (INTERVAL '1 year') ORDER BY host, ts;
SELECT ts, host, min(val) RANGE (INTERVAL '1' year) FROM host ALIGN (INTERVAL '1' year) ORDER BY host, ts;
SELECT ts, host, min(val) RANGE (INTERVAL '1' day) FROM host ALIGN (INTERVAL '1' day) ORDER BY host, ts;
DROP TABLE host;