fix: impl total order for Timestamp (#878)

* 1. Reimplement Eq for Timestamp
2. Add and/or for GenericRange

* chore: add test for TimestampRange with diff unit

* chore: optimize split implementation

* fix: clippy

* fix: add fast path

* fix: CR comments
This commit is contained in:
Lei, HUANG
2023-01-16 17:37:30 +08:00
committed by GitHub
parent bae0243959
commit daad38360f
8 changed files with 537 additions and 40 deletions

2
Cargo.lock generated
View File

@@ -1592,6 +1592,7 @@ name = "common-time"
version = "0.1.0"
dependencies = [
"chrono",
"rand 0.8.5",
"serde",
"serde_json",
"snafu",
@@ -7223,6 +7224,7 @@ dependencies = [
"common-query",
"common-recordbatch",
"common-telemetry",
"common-time",
"datafusion",
"datafusion-common",
"datafusion-expr",

View File

@@ -9,3 +9,6 @@ chrono = "0.4"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
snafu = { version = "0.7", features = ["backtraces"] }
[dev-dependencies]
rand = "0.8"

View File

@@ -12,70 +12,173 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::timestamp::TimeUnit;
use crate::timestamp_millis::TimestampMillis;
use crate::Timestamp;
/// A half-open time range.
///
/// The time range contains all timestamp `ts` that `ts >= start` and `ts < end`. It is
/// empty if `start == end`.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct TimeRange<T> {
start: T,
end: T,
/// empty if `start >= end`.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct GenericRange<T> {
start: Option<T>,
end: Option<T>,
}
impl<T> TimeRange<T> {
impl<T> GenericRange<T>
where
T: Copy + PartialOrd,
{
pub fn and(&self, other: &GenericRange<T>) -> GenericRange<T> {
let start = match (self.start(), other.start()) {
(Some(l), Some(r)) => {
if l > r {
Some(*l)
} else {
Some(*r)
}
}
(Some(l), None) => Some(*l),
(None, Some(r)) => Some(*r),
(None, None) => None,
};
let end = match (self.end(), other.end()) {
(Some(l), Some(r)) => {
if l > r {
Some(*r)
} else {
Some(*l)
}
}
(Some(l), None) => Some(*l),
(None, Some(r)) => Some(*r),
(None, None) => None,
};
Self { start, end }
}
/// Compute the OR'ed range of two ranges.
/// Notice: this method does not compute the exact OR'ed operation for simplicity.
/// For example, `[1, 2)` or'ed with `[4, 5)` will produce `[1, 5)`
/// instead of `[1, 2) [4, 5)`
pub fn or(&self, other: &GenericRange<T>) -> GenericRange<T> {
if self.is_empty() {
return *other;
}
if other.is_empty() {
return *self;
}
let start = match (self.start(), other.start()) {
(Some(l), Some(r)) => {
if l > r {
Some(*r)
} else {
Some(*l)
}
}
(Some(_), None) => None,
(None, Some(_)) => None,
(None, None) => None,
};
let end = match (self.end(), other.end()) {
(Some(l), Some(r)) => {
if l > r {
Some(*l)
} else {
Some(*r)
}
}
(Some(_), None) => None,
(None, Some(_)) => None,
(None, None) => None,
};
Self { start, end }
}
}
impl<T> GenericRange<T> {
/// Creates a new range that contains timestamp in `[start, end)`.
///
/// Returns `None` if `start` > `end`.
pub fn new<U: PartialOrd + Into<T>>(start: U, end: U) -> Option<TimeRange<T>> {
pub fn new<U: PartialOrd + Into<T>>(start: U, end: U) -> Option<GenericRange<T>> {
if start <= end {
let (start, end) = (start.into(), end.into());
Some(TimeRange { start, end })
let (start, end) = (Some(start.into()), Some(end.into()));
Some(GenericRange { start, end })
} else {
None
}
}
/// Given a value, creates an empty time range that `start == end == value`.
pub fn empty_with_value<U: Clone + Into<T>>(value: U) -> TimeRange<T> {
TimeRange {
start: value.clone().into(),
end: value.into(),
pub fn empty_with_value<U: Clone + Into<T>>(value: U) -> GenericRange<T> {
GenericRange {
start: Some(value.clone().into()),
end: Some(value.into()),
}
}
pub fn min_to_max() -> GenericRange<T> {
Self {
start: None,
end: None,
}
}
/// Returns the lower bound of the range (inclusive).
#[inline]
pub fn start(&self) -> &T {
pub fn start(&self) -> &Option<T> {
&self.start
}
/// Returns the upper bound of the range (exclusive).
#[inline]
pub fn end(&self) -> &T {
pub fn end(&self) -> &Option<T> {
&self.end
}
/// Returns true if `timestamp` is contained in the range.
pub fn contains<U: PartialOrd<T>>(&self, timestamp: &U) -> bool {
*timestamp >= self.start && *timestamp < self.end
match (&self.start, &self.end) {
(Some(start), Some(end)) => *timestamp >= *start && *timestamp < *end,
(Some(start), None) => *timestamp >= *start,
(None, Some(end)) => *timestamp < *end,
(None, None) => true,
}
}
}
impl<T: PartialOrd> TimeRange<T> {
impl<T: PartialOrd> GenericRange<T> {
/// Returns true if the range contains no timestamps.
#[inline]
pub fn is_empty(&self) -> bool {
self.start >= self.end
match (&self.start, &self.end) {
(Some(start), Some(end)) => start >= end,
_ => false,
}
}
}
pub type TimestampRange = GenericRange<Timestamp>;
impl TimestampRange {
pub fn with_unit(start: i64, end: i64, unit: TimeUnit) -> Option<Self> {
let start = Timestamp::new(start, unit);
let end = Timestamp::new(end, unit);
Self::new(start, end)
}
}
/// Time range in milliseconds.
pub type RangeMillis = TimeRange<TimestampMillis>;
pub type RangeMillis = GenericRange<TimestampMillis>;
#[cfg(test)]
mod tests {
use super::*;
#[test]
@@ -83,8 +186,8 @@ mod tests {
let (start, end) = (TimestampMillis::new(0), TimestampMillis::new(100));
let range = RangeMillis::new(start, end).unwrap();
assert_eq!(start, *range.start());
assert_eq!(end, *range.end());
assert_eq!(Some(start), *range.start());
assert_eq!(Some(end), *range.end());
let range2 = RangeMillis::new(0, 100).unwrap();
assert_eq!(range, range2);
@@ -96,7 +199,7 @@ mod tests {
let range = RangeMillis::empty_with_value(1024);
assert_eq!(range.start(), range.end());
assert_eq!(1024, *range.start());
assert_eq!(Some(TimestampMillis::new(1024)), *range.start());
}
#[test]
@@ -118,4 +221,93 @@ mod tests {
assert!(range.is_empty());
assert!(!range.contains(&0));
}
#[test]
fn test_range_with_diff_unit() {
let r1 = TimestampRange::with_unit(1, 2, TimeUnit::Second).unwrap();
let r2 = TimestampRange::with_unit(1000, 2000, TimeUnit::Millisecond).unwrap();
assert_eq!(r2, r1);
let r3 = TimestampRange::with_unit(0, 0, TimeUnit::Second).unwrap();
let r4 = TimestampRange::with_unit(0, 0, TimeUnit::Millisecond).unwrap();
assert_eq!(r3, r4);
let r5 = TimestampRange::with_unit(0, 0, TimeUnit::Millisecond).unwrap();
let r6 = TimestampRange::with_unit(0, 0, TimeUnit::Microsecond).unwrap();
assert_eq!(r5, r6);
let r5 = TimestampRange::with_unit(-1, 1, TimeUnit::Second).unwrap();
let r6 = TimestampRange::with_unit(-1000, 1000, TimeUnit::Millisecond).unwrap();
assert_eq!(r5, r6);
}
#[test]
fn test_range_and() {
assert_eq!(
TimestampRange::with_unit(5, 10, TimeUnit::Millisecond).unwrap(),
TimestampRange::with_unit(1, 10, TimeUnit::Millisecond)
.unwrap()
.and(&TimestampRange::with_unit(5, 20, TimeUnit::Millisecond).unwrap())
);
let empty = TimestampRange::with_unit(0, 10, TimeUnit::Millisecond)
.unwrap()
.and(&TimestampRange::with_unit(11, 20, TimeUnit::Millisecond).unwrap());
assert!(empty.is_empty());
// whatever AND'ed with empty shall return empty
let empty_and_all = empty.and(&TimestampRange::min_to_max());
assert!(empty_and_all.is_empty());
assert!(empty.and(&empty).is_empty());
assert!(empty
.and(&TimestampRange::with_unit(0, 10, TimeUnit::Millisecond).unwrap())
.is_empty());
// AND TimestampRange with different unit
let anded = TimestampRange::with_unit(0, 10, TimeUnit::Millisecond)
.unwrap()
.and(&TimestampRange::with_unit(1000, 12000, TimeUnit::Microsecond).unwrap());
assert_eq!(
TimestampRange::with_unit(1, 10, TimeUnit::Millisecond).unwrap(),
anded
);
let anded = TimestampRange::with_unit(0, 10, TimeUnit::Millisecond)
.unwrap()
.and(&TimestampRange::with_unit(1000, 12000, TimeUnit::Microsecond).unwrap());
assert_eq!(
TimestampRange::with_unit(1, 10, TimeUnit::Millisecond).unwrap(),
anded
);
}
#[test]
fn test_range_or() {
assert_eq!(
TimestampRange::with_unit(1, 20, TimeUnit::Millisecond).unwrap(),
TimestampRange::with_unit(1, 10, TimeUnit::Millisecond)
.unwrap()
.or(&TimestampRange::with_unit(5, 20, TimeUnit::Millisecond).unwrap())
);
assert_eq!(
TimestampRange::min_to_max(),
TimestampRange::min_to_max().or(&TimestampRange::min_to_max())
);
let empty = TimestampRange::empty_with_value(Timestamp::new_millisecond(1)).or(
&TimestampRange::empty_with_value(Timestamp::new_millisecond(2)),
);
assert!(empty.is_empty());
let t1 = TimestampRange::with_unit(-10, 0, TimeUnit::Second).unwrap();
let t2 = TimestampRange::with_unit(-30, -20, TimeUnit::Second).unwrap();
assert_eq!(
TimestampRange::with_unit(-30, 0, TimeUnit::Second).unwrap(),
t1.or(&t2)
);
let t1 = TimestampRange::with_unit(-10, 0, TimeUnit::Second).unwrap();
assert_eq!(t1, t1.or(&t1));
}
}

View File

@@ -71,9 +71,29 @@ impl Timestamp {
self.value
}
pub fn convert_to(&self, unit: TimeUnit) -> i64 {
// TODO(hl): May result into overflow
self.value * self.unit.factor() / unit.factor()
/// Convert a timestamp to given time unit.
/// Conversion from a timestamp with smaller unit to a larger unit may cause rounding error.
/// Return `None` if conversion causes overflow.
pub fn convert_to(&self, unit: TimeUnit) -> Option<Timestamp> {
if self.unit().factor() >= unit.factor() {
let mul = self.unit().factor() / unit.factor();
let value = self.value.checked_mul(mul)?;
Some(Timestamp::new(value, unit))
} else {
let mul = unit.factor() / self.unit().factor();
Some(Timestamp::new(self.value.div_euclid(mul), unit))
}
}
/// Split a [Timestamp] into seconds part and nanoseconds part.
/// Notice the seconds part of split result is always rounded down to floor.
fn split(&self) -> (i64, i64) {
let sec_mul = TimeUnit::Second.factor() / self.unit.factor();
let nsec_mul = self.unit.factor() / TimeUnit::Nanosecond.factor();
let sec_div = self.value.div_euclid(sec_mul);
let sec_mod = self.value.rem_euclid(sec_mul);
(sec_div, sec_mod * nsec_mul)
}
/// Format timestamp to ISO8601 string. If the timestamp exceeds what chrono timestamp can
@@ -81,8 +101,7 @@ impl Timestamp {
pub fn to_iso8601_string(&self) -> String {
let nano_factor = TimeUnit::Second.factor() / TimeUnit::Nanosecond.factor();
let mut secs = self.convert_to(TimeUnit::Second);
let mut nsecs = self.convert_to(TimeUnit::Nanosecond) % nano_factor;
let (mut secs, mut nsecs) = self.split();
if nsecs < 0 {
secs -= 1;
@@ -227,19 +246,33 @@ impl TimeUnit {
impl PartialOrd for Timestamp {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
(self.value * self.unit.factor()).partial_cmp(&(other.value * other.unit.factor()))
Some(self.cmp(other))
}
}
/// A proper implementation of total order requires antisymmetry, reflexivity, transitivity and totality.
/// In this comparison implementation, we map a timestamp uniquely to a `(i64, i64)` tuple which is respectively
/// total order.
impl Ord for Timestamp {
fn cmp(&self, other: &Self) -> Ordering {
(self.value * self.unit.factor()).cmp(&(other.value * other.unit.factor()))
// fast path: most comparisons use the same unit.
if self.unit == other.unit {
return self.value.cmp(&other.value);
}
let (s_sec, s_nsec) = self.split();
let (o_sec, o_nsec) = other.split();
match s_sec.cmp(&o_sec) {
Ordering::Less => Ordering::Less,
Ordering::Greater => Ordering::Greater,
Ordering::Equal => s_nsec.cmp(&o_nsec),
}
}
}
impl PartialEq for Timestamp {
fn eq(&self, other: &Self) -> bool {
self.convert_to(TimeUnit::Nanosecond) == other.convert_to(TimeUnit::Nanosecond)
self.cmp(other) == Ordering::Equal
}
}
@@ -247,14 +280,18 @@ impl Eq for Timestamp {}
impl Hash for Timestamp {
fn hash<H: Hasher>(&self, state: &mut H) {
state.write_i64(self.convert_to(TimeUnit::Nanosecond));
state.finish();
let (sec, nsec) = self.split();
state.write_i64(sec);
state.write_i64(nsec);
}
}
#[cfg(test)]
mod tests {
use std::collections::hash_map::DefaultHasher;
use chrono::Offset;
use rand::Rng;
use serde_json::Value;
use super::*;
@@ -284,6 +321,162 @@ mod tests {
assert!(t > Timestamp::new(999, TimeUnit::Microsecond));
}
#[test]
fn test_timestamp_antisymmetry() {
let t1 = Timestamp::new(1, TimeUnit::Second);
let t2 = Timestamp::new(1000, TimeUnit::Millisecond);
assert!(t1 >= t2);
assert!(t2 >= t1);
assert_eq!(Ordering::Equal, t1.cmp(&t2));
}
fn gen_random_ts() -> Timestamp {
let units = [
TimeUnit::Second,
TimeUnit::Millisecond,
TimeUnit::Microsecond,
TimeUnit::Nanosecond,
];
let mut rng = rand::thread_rng();
let unit_idx: usize = rng.gen_range(0..4);
let unit = units[unit_idx];
let value: i64 = rng.gen();
Timestamp::new(value, unit)
}
#[test]
fn test_timestamp_reflexivity() {
for _ in 0..1000 {
let ts = gen_random_ts();
assert!(ts >= ts, "ts: {ts:?}");
}
}
/// Generate timestamp less than or equal to `threshold`
fn gen_ts_le(threshold: &Timestamp) -> Timestamp {
let mut rng = rand::thread_rng();
let timestamp = rng.gen_range(i64::MIN..=threshold.value);
Timestamp::new(timestamp, threshold.unit)
}
#[test]
fn test_timestamp_transitivity() {
let t0 = Timestamp::new_millisecond(100);
let t1 = gen_ts_le(&t0);
let t2 = gen_ts_le(&t1);
assert!(t0 >= t1, "t0: {t0:?}, t1: {t1:?}");
assert!(t1 >= t2, "t1: {t1:?}, t2: {t2:?}");
assert!(t0 >= t2, "t0: {t0:?}, t2: {t2:?}");
let t0 = Timestamp::new_millisecond(-100);
let t1 = gen_ts_le(&t0); // t0 >= t1
let t2 = gen_ts_le(&t1); // t1 >= t2
assert!(t0 >= t1, "t0: {t0:?}, t1: {t1:?}");
assert!(t1 >= t2, "t1: {t1:?}, t2: {t2:?}");
assert!(t0 >= t2, "t0: {t0:?}, t2: {t2:?}"); // check if t0 >= t2
}
#[test]
fn test_antisymmetry() {
let t0 = Timestamp::new(1, TimeUnit::Second);
let t1 = Timestamp::new(1000, TimeUnit::Millisecond);
assert!(t0 >= t1, "t0: {t0:?}, t1: {t1:?}");
assert!(t1 >= t0, "t0: {t0:?}, t1: {t1:?}");
assert_eq!(t1, t0, "t0: {t0:?}, t1: {t1:?}");
}
#[test]
fn test_strong_connectivity() {
let mut values = Vec::with_capacity(1000);
for _ in 0..1000 {
values.push(gen_random_ts());
}
for l in &values {
for r in &values {
assert!(l >= r || l <= r, "l: {l:?}, r: {r:?}");
}
}
}
#[test]
fn test_cmp_timestamp() {
let t1 = Timestamp::new(0, TimeUnit::Millisecond);
let t2 = Timestamp::new(0, TimeUnit::Second);
assert_eq!(t2, t1);
let t1 = Timestamp::new(1, TimeUnit::Millisecond);
let t2 = Timestamp::new(-1, TimeUnit::Second);
assert!(t1 > t2);
let t1 = Timestamp::new(i64::MAX / 1000 * 1000, TimeUnit::Millisecond);
let t2 = Timestamp::new(i64::MAX / 1000, TimeUnit::Second);
assert_eq!(t2, t1);
let t1 = Timestamp::new(i64::MAX, TimeUnit::Millisecond);
let t2 = Timestamp::new(i64::MAX / 1000 + 1, TimeUnit::Second);
assert!(t2 > t1);
let t1 = Timestamp::new(i64::MAX, TimeUnit::Millisecond);
let t2 = Timestamp::new(i64::MAX / 1000, TimeUnit::Second);
assert!(t2 < t1);
let t1 = Timestamp::new(10_010_001, TimeUnit::Millisecond);
let t2 = Timestamp::new(100, TimeUnit::Second);
assert!(t1 > t2);
let t1 = Timestamp::new(-100 * 10_001, TimeUnit::Millisecond);
let t2 = Timestamp::new(-100, TimeUnit::Second);
assert!(t2 > t1);
let t1 = Timestamp::new(i64::MIN, TimeUnit::Millisecond);
let t2 = Timestamp::new(i64::MIN / 1000 - 1, TimeUnit::Second);
assert!(t1 > t2);
let t1 = Timestamp::new(i64::MIN, TimeUnit::Millisecond);
let t2 = Timestamp::new(i64::MIN + 1, TimeUnit::Millisecond);
assert!(t2 > t1);
let t1 = Timestamp::new(i64::MAX, TimeUnit::Millisecond);
let t2 = Timestamp::new(i64::MIN, TimeUnit::Second);
assert!(t1 > t2);
let t1 = Timestamp::new(0, TimeUnit::Nanosecond);
let t2 = Timestamp::new(i64::MIN, TimeUnit::Second);
assert!(t1 > t2);
}
fn check_hash_eq(t1: Timestamp, t2: Timestamp) {
let mut hasher = DefaultHasher::new();
t1.hash(&mut hasher);
let t1_hash = hasher.finish();
let mut hasher = DefaultHasher::new();
t2.hash(&mut hasher);
let t2_hash = hasher.finish();
assert_eq!(t2_hash, t1_hash);
}
#[test]
fn test_hash() {
check_hash_eq(
Timestamp::new(0, TimeUnit::Millisecond),
Timestamp::new(0, TimeUnit::Second),
);
check_hash_eq(
Timestamp::new(1000, TimeUnit::Millisecond),
Timestamp::new(1, TimeUnit::Second),
);
check_hash_eq(
Timestamp::new(1_000_000, TimeUnit::Microsecond),
Timestamp::new(1, TimeUnit::Second),
);
check_hash_eq(
Timestamp::new(1_000_000_000, TimeUnit::Nanosecond),
Timestamp::new(1, TimeUnit::Second),
);
}
#[test]
pub fn test_from_i64() {
let t: Timestamp = 42.into();
@@ -437,4 +630,92 @@ mod tests {
}
);
}
#[test]
fn test_convert_timestamp() {
let ts = Timestamp::new(1, TimeUnit::Second);
assert_eq!(
Timestamp::new(1000, TimeUnit::Millisecond),
ts.convert_to(TimeUnit::Millisecond).unwrap()
);
assert_eq!(
Timestamp::new(1_000_000, TimeUnit::Microsecond),
ts.convert_to(TimeUnit::Microsecond).unwrap()
);
assert_eq!(
Timestamp::new(1_000_000_000, TimeUnit::Nanosecond),
ts.convert_to(TimeUnit::Nanosecond).unwrap()
);
let ts = Timestamp::new(1_000_100_100, TimeUnit::Nanosecond);
assert_eq!(
Timestamp::new(1_000_100, TimeUnit::Microsecond),
ts.convert_to(TimeUnit::Microsecond).unwrap()
);
assert_eq!(
Timestamp::new(1000, TimeUnit::Millisecond),
ts.convert_to(TimeUnit::Millisecond).unwrap()
);
assert_eq!(
Timestamp::new(1, TimeUnit::Second),
ts.convert_to(TimeUnit::Second).unwrap()
);
let ts = Timestamp::new(1_000_100_100, TimeUnit::Nanosecond);
assert_eq!(ts, ts.convert_to(TimeUnit::Nanosecond).unwrap());
let ts = Timestamp::new(1_000_100_100, TimeUnit::Microsecond);
assert_eq!(ts, ts.convert_to(TimeUnit::Microsecond).unwrap());
let ts = Timestamp::new(1_000_100_100, TimeUnit::Millisecond);
assert_eq!(ts, ts.convert_to(TimeUnit::Millisecond).unwrap());
let ts = Timestamp::new(1_000_100_100, TimeUnit::Second);
assert_eq!(ts, ts.convert_to(TimeUnit::Second).unwrap());
// -9223372036854775808 in milliseconds should be rounded up to -9223372036854776 in seconds
assert_eq!(
Timestamp::new(-9223372036854776, TimeUnit::Second),
Timestamp::new(i64::MIN, TimeUnit::Millisecond)
.convert_to(TimeUnit::Second)
.unwrap()
);
assert!(Timestamp::new(i64::MAX, TimeUnit::Second)
.convert_to(TimeUnit::Millisecond)
.is_none());
}
#[test]
fn test_split() {
assert_eq!((0, 0), Timestamp::new(0, TimeUnit::Second).split());
assert_eq!((1, 0), Timestamp::new(1, TimeUnit::Second).split());
assert_eq!(
(0, 1_000_000),
Timestamp::new(1, TimeUnit::Millisecond).split()
);
assert_eq!((0, 1_000), Timestamp::new(1, TimeUnit::Microsecond).split());
assert_eq!((0, 1), Timestamp::new(1, TimeUnit::Nanosecond).split());
assert_eq!(
(1, 1_000_000),
Timestamp::new(1001, TimeUnit::Millisecond).split()
);
assert_eq!(
(-2, 999_000_000),
Timestamp::new(-1001, TimeUnit::Millisecond).split()
);
// check min value of nanos
let (sec, nsec) = Timestamp::new(i64::MIN, TimeUnit::Nanosecond).split();
assert_eq!(
i64::MIN as i128,
sec as i128 * (TimeUnit::Second.factor() / TimeUnit::Nanosecond.factor()) as i128
+ nsec as i128
);
assert_eq!(
(i64::MAX, 0),
Timestamp::new(i64::MAX, TimeUnit::Second).split()
);
}
}

View File

@@ -126,8 +126,10 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
Value::Binary(v) => row_writer.write_col(v.deref())?,
Value::Date(v) => row_writer.write_col(v.val())?,
Value::DateTime(v) => row_writer.write_col(v.val())?,
Value::Timestamp(v) => row_writer
.write_col(DateTime::new(v.convert_to(TimeUnit::Second)).to_string())?,
Value::Timestamp(v) => row_writer.write_col(
// safety: converting timestamp with whatever unit to second will not cause overflow
DateTime::new(v.convert_to(TimeUnit::Second).unwrap().value()).to_string(),
)?,
Value::List(_) => {
return Err(Error::Internal {
err_msg: format!(

View File

@@ -15,6 +15,8 @@
use std::any::Any;
use common_error::prelude::*;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use datatypes::prelude::ConcreteDataType;
use sqlparser::parser::ParserError;
use sqlparser::tokenizer::TokenizerError;
@@ -134,6 +136,17 @@ pub enum Error {
#[snafu(display("Invalid sql value: {}", value))]
InvalidSqlValue { value: String, backtrace: Backtrace },
#[snafu(display(
"Converting timestamp {:?} to unit {:?} overflow",
timestamp,
target_unit
))]
TimestampOverflow {
timestamp: Timestamp,
target_unit: TimeUnit,
backtrace: Backtrace,
},
}
impl ErrorExt for Error {
@@ -160,6 +173,7 @@ impl ErrorExt for Error {
SerializeColumnDefaultConstraint { source, .. } => source.status_code(),
ConvertToGrpcDataType { source, .. } => source.status_code(),
InvalidSqlValue { .. } => StatusCode::InvalidArguments,
TimestampOverflow { .. } => StatusCode::InvalidArguments,
}
}

View File

@@ -32,7 +32,7 @@ use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema};
use datatypes::types::DateTimeType;
use datatypes::value::Value;
use snafu::{ensure, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use crate::ast::{
ColumnDef, ColumnOption, ColumnOptionDef, DataType as SqlDataType, Expr, ObjectName,
@@ -40,7 +40,7 @@ use crate::ast::{
};
use crate::error::{
self, ColumnTypeMismatchSnafu, ConvertToGrpcDataTypeSnafu, InvalidSqlValueSnafu,
ParseSqlValueSnafu, Result, SerializeColumnDefaultConstraintSnafu,
ParseSqlValueSnafu, Result, SerializeColumnDefaultConstraintSnafu, TimestampOverflowSnafu,
UnsupportedDefaultValueSnafu,
};
@@ -112,10 +112,12 @@ fn parse_string_to_value(
}
ConcreteDataType::Timestamp(t) => {
if let Ok(ts) = Timestamp::from_str(&s) {
Ok(Value::Timestamp(Timestamp::new(
ts.convert_to(t.unit()),
t.unit(),
)))
Ok(Value::Timestamp(ts.convert_to(t.unit()).context(
TimestampOverflowSnafu {
timestamp: ts,
target_unit: t.unit(),
},
)?))
} else {
ParseSqlValueSnafu {
msg: format!("Failed to parse {s} to Timestamp value"),

View File

@@ -13,6 +13,7 @@ common-error = { path = "../common/error" }
common-query = { path = "../common/query" }
common-recordbatch = { path = "../common/recordbatch" }
common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
datafusion.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true