From daad38360fa7a309024cf04bead20a1aece37e86 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Mon, 16 Jan 2023 17:37:30 +0800 Subject: [PATCH] fix: impl total order for Timestamp (#878) * 1. Reimplement Eq for Timestamp 2. Add and/or for GenericRange * chore: add test for TimestampRange with diff unit * chore: optimize split implementation * fix: clippy * fix: add fast path * fix: CR comments --- Cargo.lock | 2 + src/common/time/Cargo.toml | 3 + src/common/time/src/range.rs | 236 +++++++++++++++++++++--- src/common/time/src/timestamp.rs | 301 ++++++++++++++++++++++++++++++- src/servers/src/mysql/writer.rs | 6 +- src/sql/src/error.rs | 14 ++ src/sql/src/statements.rs | 14 +- src/table/Cargo.toml | 1 + 8 files changed, 537 insertions(+), 40 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 03e5f33374..8b8cb6b29f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1592,6 +1592,7 @@ name = "common-time" version = "0.1.0" dependencies = [ "chrono", + "rand 0.8.5", "serde", "serde_json", "snafu", @@ -7223,6 +7224,7 @@ dependencies = [ "common-query", "common-recordbatch", "common-telemetry", + "common-time", "datafusion", "datafusion-common", "datafusion-expr", diff --git a/src/common/time/Cargo.toml b/src/common/time/Cargo.toml index 2ecf2d68ee..799be63531 100644 --- a/src/common/time/Cargo.toml +++ b/src/common/time/Cargo.toml @@ -9,3 +9,6 @@ chrono = "0.4" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" snafu = { version = "0.7", features = ["backtraces"] } + +[dev-dependencies] +rand = "0.8" diff --git a/src/common/time/src/range.rs b/src/common/time/src/range.rs index ef1935caf5..42971aa489 100644 --- a/src/common/time/src/range.rs +++ b/src/common/time/src/range.rs @@ -12,70 +12,173 @@ // See the License for the specific language governing permissions and // limitations under the License. +use crate::timestamp::TimeUnit; use crate::timestamp_millis::TimestampMillis; +use crate::Timestamp; /// A half-open time range. /// /// The time range contains all timestamp `ts` that `ts >= start` and `ts < end`. It is -/// empty if `start == end`. -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub struct TimeRange { - start: T, - end: T, +/// empty if `start >= end`. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct GenericRange { + start: Option, + end: Option, } -impl TimeRange { +impl GenericRange +where + T: Copy + PartialOrd, +{ + pub fn and(&self, other: &GenericRange) -> GenericRange { + let start = match (self.start(), other.start()) { + (Some(l), Some(r)) => { + if l > r { + Some(*l) + } else { + Some(*r) + } + } + (Some(l), None) => Some(*l), + (None, Some(r)) => Some(*r), + (None, None) => None, + }; + + let end = match (self.end(), other.end()) { + (Some(l), Some(r)) => { + if l > r { + Some(*r) + } else { + Some(*l) + } + } + (Some(l), None) => Some(*l), + (None, Some(r)) => Some(*r), + (None, None) => None, + }; + + Self { start, end } + } + + /// Compute the OR'ed range of two ranges. + /// Notice: this method does not compute the exact OR'ed operation for simplicity. + /// For example, `[1, 2)` or'ed with `[4, 5)` will produce `[1, 5)` + /// instead of `[1, 2) ∪ [4, 5)` + pub fn or(&self, other: &GenericRange) -> GenericRange { + if self.is_empty() { + return *other; + } + + if other.is_empty() { + return *self; + } + let start = match (self.start(), other.start()) { + (Some(l), Some(r)) => { + if l > r { + Some(*r) + } else { + Some(*l) + } + } + (Some(_), None) => None, + (None, Some(_)) => None, + (None, None) => None, + }; + + let end = match (self.end(), other.end()) { + (Some(l), Some(r)) => { + if l > r { + Some(*l) + } else { + Some(*r) + } + } + (Some(_), None) => None, + (None, Some(_)) => None, + (None, None) => None, + }; + + Self { start, end } + } +} + +impl GenericRange { /// Creates a new range that contains timestamp in `[start, end)`. /// /// Returns `None` if `start` > `end`. - pub fn new>(start: U, end: U) -> Option> { + pub fn new>(start: U, end: U) -> Option> { if start <= end { - let (start, end) = (start.into(), end.into()); - Some(TimeRange { start, end }) + let (start, end) = (Some(start.into()), Some(end.into())); + Some(GenericRange { start, end }) } else { None } } /// Given a value, creates an empty time range that `start == end == value`. - pub fn empty_with_value>(value: U) -> TimeRange { - TimeRange { - start: value.clone().into(), - end: value.into(), + pub fn empty_with_value>(value: U) -> GenericRange { + GenericRange { + start: Some(value.clone().into()), + end: Some(value.into()), + } + } + + pub fn min_to_max() -> GenericRange { + Self { + start: None, + end: None, } } /// Returns the lower bound of the range (inclusive). #[inline] - pub fn start(&self) -> &T { + pub fn start(&self) -> &Option { &self.start } /// Returns the upper bound of the range (exclusive). #[inline] - pub fn end(&self) -> &T { + pub fn end(&self) -> &Option { &self.end } /// Returns true if `timestamp` is contained in the range. pub fn contains>(&self, timestamp: &U) -> bool { - *timestamp >= self.start && *timestamp < self.end + match (&self.start, &self.end) { + (Some(start), Some(end)) => *timestamp >= *start && *timestamp < *end, + (Some(start), None) => *timestamp >= *start, + (None, Some(end)) => *timestamp < *end, + (None, None) => true, + } } } -impl TimeRange { +impl GenericRange { /// Returns true if the range contains no timestamps. #[inline] pub fn is_empty(&self) -> bool { - self.start >= self.end + match (&self.start, &self.end) { + (Some(start), Some(end)) => start >= end, + _ => false, + } + } +} + +pub type TimestampRange = GenericRange; +impl TimestampRange { + pub fn with_unit(start: i64, end: i64, unit: TimeUnit) -> Option { + let start = Timestamp::new(start, unit); + let end = Timestamp::new(end, unit); + Self::new(start, end) } } /// Time range in milliseconds. -pub type RangeMillis = TimeRange; +pub type RangeMillis = GenericRange; #[cfg(test)] mod tests { + use super::*; #[test] @@ -83,8 +186,8 @@ mod tests { let (start, end) = (TimestampMillis::new(0), TimestampMillis::new(100)); let range = RangeMillis::new(start, end).unwrap(); - assert_eq!(start, *range.start()); - assert_eq!(end, *range.end()); + assert_eq!(Some(start), *range.start()); + assert_eq!(Some(end), *range.end()); let range2 = RangeMillis::new(0, 100).unwrap(); assert_eq!(range, range2); @@ -96,7 +199,7 @@ mod tests { let range = RangeMillis::empty_with_value(1024); assert_eq!(range.start(), range.end()); - assert_eq!(1024, *range.start()); + assert_eq!(Some(TimestampMillis::new(1024)), *range.start()); } #[test] @@ -118,4 +221,93 @@ mod tests { assert!(range.is_empty()); assert!(!range.contains(&0)); } + + #[test] + fn test_range_with_diff_unit() { + let r1 = TimestampRange::with_unit(1, 2, TimeUnit::Second).unwrap(); + let r2 = TimestampRange::with_unit(1000, 2000, TimeUnit::Millisecond).unwrap(); + assert_eq!(r2, r1); + + let r3 = TimestampRange::with_unit(0, 0, TimeUnit::Second).unwrap(); + let r4 = TimestampRange::with_unit(0, 0, TimeUnit::Millisecond).unwrap(); + assert_eq!(r3, r4); + + let r5 = TimestampRange::with_unit(0, 0, TimeUnit::Millisecond).unwrap(); + let r6 = TimestampRange::with_unit(0, 0, TimeUnit::Microsecond).unwrap(); + assert_eq!(r5, r6); + + let r5 = TimestampRange::with_unit(-1, 1, TimeUnit::Second).unwrap(); + let r6 = TimestampRange::with_unit(-1000, 1000, TimeUnit::Millisecond).unwrap(); + assert_eq!(r5, r6); + } + + #[test] + fn test_range_and() { + assert_eq!( + TimestampRange::with_unit(5, 10, TimeUnit::Millisecond).unwrap(), + TimestampRange::with_unit(1, 10, TimeUnit::Millisecond) + .unwrap() + .and(&TimestampRange::with_unit(5, 20, TimeUnit::Millisecond).unwrap()) + ); + + let empty = TimestampRange::with_unit(0, 10, TimeUnit::Millisecond) + .unwrap() + .and(&TimestampRange::with_unit(11, 20, TimeUnit::Millisecond).unwrap()); + assert!(empty.is_empty()); + + // whatever AND'ed with empty shall return empty + let empty_and_all = empty.and(&TimestampRange::min_to_max()); + assert!(empty_and_all.is_empty()); + assert!(empty.and(&empty).is_empty()); + assert!(empty + .and(&TimestampRange::with_unit(0, 10, TimeUnit::Millisecond).unwrap()) + .is_empty()); + + // AND TimestampRange with different unit + let anded = TimestampRange::with_unit(0, 10, TimeUnit::Millisecond) + .unwrap() + .and(&TimestampRange::with_unit(1000, 12000, TimeUnit::Microsecond).unwrap()); + assert_eq!( + TimestampRange::with_unit(1, 10, TimeUnit::Millisecond).unwrap(), + anded + ); + + let anded = TimestampRange::with_unit(0, 10, TimeUnit::Millisecond) + .unwrap() + .and(&TimestampRange::with_unit(1000, 12000, TimeUnit::Microsecond).unwrap()); + assert_eq!( + TimestampRange::with_unit(1, 10, TimeUnit::Millisecond).unwrap(), + anded + ); + } + + #[test] + fn test_range_or() { + assert_eq!( + TimestampRange::with_unit(1, 20, TimeUnit::Millisecond).unwrap(), + TimestampRange::with_unit(1, 10, TimeUnit::Millisecond) + .unwrap() + .or(&TimestampRange::with_unit(5, 20, TimeUnit::Millisecond).unwrap()) + ); + + assert_eq!( + TimestampRange::min_to_max(), + TimestampRange::min_to_max().or(&TimestampRange::min_to_max()) + ); + + let empty = TimestampRange::empty_with_value(Timestamp::new_millisecond(1)).or( + &TimestampRange::empty_with_value(Timestamp::new_millisecond(2)), + ); + assert!(empty.is_empty()); + + let t1 = TimestampRange::with_unit(-10, 0, TimeUnit::Second).unwrap(); + let t2 = TimestampRange::with_unit(-30, -20, TimeUnit::Second).unwrap(); + assert_eq!( + TimestampRange::with_unit(-30, 0, TimeUnit::Second).unwrap(), + t1.or(&t2) + ); + + let t1 = TimestampRange::with_unit(-10, 0, TimeUnit::Second).unwrap(); + assert_eq!(t1, t1.or(&t1)); + } } diff --git a/src/common/time/src/timestamp.rs b/src/common/time/src/timestamp.rs index 2066dcb316..200f420c29 100644 --- a/src/common/time/src/timestamp.rs +++ b/src/common/time/src/timestamp.rs @@ -71,9 +71,29 @@ impl Timestamp { self.value } - pub fn convert_to(&self, unit: TimeUnit) -> i64 { - // TODO(hl): May result into overflow - self.value * self.unit.factor() / unit.factor() + /// Convert a timestamp to given time unit. + /// Conversion from a timestamp with smaller unit to a larger unit may cause rounding error. + /// Return `None` if conversion causes overflow. + pub fn convert_to(&self, unit: TimeUnit) -> Option { + if self.unit().factor() >= unit.factor() { + let mul = self.unit().factor() / unit.factor(); + let value = self.value.checked_mul(mul)?; + Some(Timestamp::new(value, unit)) + } else { + let mul = unit.factor() / self.unit().factor(); + Some(Timestamp::new(self.value.div_euclid(mul), unit)) + } + } + + /// Split a [Timestamp] into seconds part and nanoseconds part. + /// Notice the seconds part of split result is always rounded down to floor. + fn split(&self) -> (i64, i64) { + let sec_mul = TimeUnit::Second.factor() / self.unit.factor(); + let nsec_mul = self.unit.factor() / TimeUnit::Nanosecond.factor(); + + let sec_div = self.value.div_euclid(sec_mul); + let sec_mod = self.value.rem_euclid(sec_mul); + (sec_div, sec_mod * nsec_mul) } /// Format timestamp to ISO8601 string. If the timestamp exceeds what chrono timestamp can @@ -81,8 +101,7 @@ impl Timestamp { pub fn to_iso8601_string(&self) -> String { let nano_factor = TimeUnit::Second.factor() / TimeUnit::Nanosecond.factor(); - let mut secs = self.convert_to(TimeUnit::Second); - let mut nsecs = self.convert_to(TimeUnit::Nanosecond) % nano_factor; + let (mut secs, mut nsecs) = self.split(); if nsecs < 0 { secs -= 1; @@ -227,19 +246,33 @@ impl TimeUnit { impl PartialOrd for Timestamp { fn partial_cmp(&self, other: &Self) -> Option { - (self.value * self.unit.factor()).partial_cmp(&(other.value * other.unit.factor())) + Some(self.cmp(other)) } } +/// A proper implementation of total order requires antisymmetry, reflexivity, transitivity and totality. +/// In this comparison implementation, we map a timestamp uniquely to a `(i64, i64)` tuple which is respectively +/// total order. impl Ord for Timestamp { fn cmp(&self, other: &Self) -> Ordering { - (self.value * self.unit.factor()).cmp(&(other.value * other.unit.factor())) + // fast path: most comparisons use the same unit. + if self.unit == other.unit { + return self.value.cmp(&other.value); + } + + let (s_sec, s_nsec) = self.split(); + let (o_sec, o_nsec) = other.split(); + match s_sec.cmp(&o_sec) { + Ordering::Less => Ordering::Less, + Ordering::Greater => Ordering::Greater, + Ordering::Equal => s_nsec.cmp(&o_nsec), + } } } impl PartialEq for Timestamp { fn eq(&self, other: &Self) -> bool { - self.convert_to(TimeUnit::Nanosecond) == other.convert_to(TimeUnit::Nanosecond) + self.cmp(other) == Ordering::Equal } } @@ -247,14 +280,18 @@ impl Eq for Timestamp {} impl Hash for Timestamp { fn hash(&self, state: &mut H) { - state.write_i64(self.convert_to(TimeUnit::Nanosecond)); - state.finish(); + let (sec, nsec) = self.split(); + state.write_i64(sec); + state.write_i64(nsec); } } #[cfg(test)] mod tests { + use std::collections::hash_map::DefaultHasher; + use chrono::Offset; + use rand::Rng; use serde_json::Value; use super::*; @@ -284,6 +321,162 @@ mod tests { assert!(t > Timestamp::new(999, TimeUnit::Microsecond)); } + #[test] + fn test_timestamp_antisymmetry() { + let t1 = Timestamp::new(1, TimeUnit::Second); + let t2 = Timestamp::new(1000, TimeUnit::Millisecond); + assert!(t1 >= t2); + assert!(t2 >= t1); + assert_eq!(Ordering::Equal, t1.cmp(&t2)); + } + + fn gen_random_ts() -> Timestamp { + let units = [ + TimeUnit::Second, + TimeUnit::Millisecond, + TimeUnit::Microsecond, + TimeUnit::Nanosecond, + ]; + let mut rng = rand::thread_rng(); + let unit_idx: usize = rng.gen_range(0..4); + let unit = units[unit_idx]; + let value: i64 = rng.gen(); + Timestamp::new(value, unit) + } + + #[test] + fn test_timestamp_reflexivity() { + for _ in 0..1000 { + let ts = gen_random_ts(); + assert!(ts >= ts, "ts: {ts:?}"); + } + } + + /// Generate timestamp less than or equal to `threshold` + fn gen_ts_le(threshold: &Timestamp) -> Timestamp { + let mut rng = rand::thread_rng(); + let timestamp = rng.gen_range(i64::MIN..=threshold.value); + Timestamp::new(timestamp, threshold.unit) + } + + #[test] + fn test_timestamp_transitivity() { + let t0 = Timestamp::new_millisecond(100); + let t1 = gen_ts_le(&t0); + let t2 = gen_ts_le(&t1); + assert!(t0 >= t1, "t0: {t0:?}, t1: {t1:?}"); + assert!(t1 >= t2, "t1: {t1:?}, t2: {t2:?}"); + assert!(t0 >= t2, "t0: {t0:?}, t2: {t2:?}"); + + let t0 = Timestamp::new_millisecond(-100); + let t1 = gen_ts_le(&t0); // t0 >= t1 + let t2 = gen_ts_le(&t1); // t1 >= t2 + assert!(t0 >= t1, "t0: {t0:?}, t1: {t1:?}"); + assert!(t1 >= t2, "t1: {t1:?}, t2: {t2:?}"); + assert!(t0 >= t2, "t0: {t0:?}, t2: {t2:?}"); // check if t0 >= t2 + } + + #[test] + fn test_antisymmetry() { + let t0 = Timestamp::new(1, TimeUnit::Second); + let t1 = Timestamp::new(1000, TimeUnit::Millisecond); + assert!(t0 >= t1, "t0: {t0:?}, t1: {t1:?}"); + assert!(t1 >= t0, "t0: {t0:?}, t1: {t1:?}"); + assert_eq!(t1, t0, "t0: {t0:?}, t1: {t1:?}"); + } + + #[test] + fn test_strong_connectivity() { + let mut values = Vec::with_capacity(1000); + for _ in 0..1000 { + values.push(gen_random_ts()); + } + + for l in &values { + for r in &values { + assert!(l >= r || l <= r, "l: {l:?}, r: {r:?}"); + } + } + } + + #[test] + fn test_cmp_timestamp() { + let t1 = Timestamp::new(0, TimeUnit::Millisecond); + let t2 = Timestamp::new(0, TimeUnit::Second); + assert_eq!(t2, t1); + + let t1 = Timestamp::new(1, TimeUnit::Millisecond); + let t2 = Timestamp::new(-1, TimeUnit::Second); + assert!(t1 > t2); + + let t1 = Timestamp::new(i64::MAX / 1000 * 1000, TimeUnit::Millisecond); + let t2 = Timestamp::new(i64::MAX / 1000, TimeUnit::Second); + assert_eq!(t2, t1); + + let t1 = Timestamp::new(i64::MAX, TimeUnit::Millisecond); + let t2 = Timestamp::new(i64::MAX / 1000 + 1, TimeUnit::Second); + assert!(t2 > t1); + + let t1 = Timestamp::new(i64::MAX, TimeUnit::Millisecond); + let t2 = Timestamp::new(i64::MAX / 1000, TimeUnit::Second); + assert!(t2 < t1); + + let t1 = Timestamp::new(10_010_001, TimeUnit::Millisecond); + let t2 = Timestamp::new(100, TimeUnit::Second); + assert!(t1 > t2); + + let t1 = Timestamp::new(-100 * 10_001, TimeUnit::Millisecond); + let t2 = Timestamp::new(-100, TimeUnit::Second); + assert!(t2 > t1); + + let t1 = Timestamp::new(i64::MIN, TimeUnit::Millisecond); + let t2 = Timestamp::new(i64::MIN / 1000 - 1, TimeUnit::Second); + assert!(t1 > t2); + + let t1 = Timestamp::new(i64::MIN, TimeUnit::Millisecond); + let t2 = Timestamp::new(i64::MIN + 1, TimeUnit::Millisecond); + assert!(t2 > t1); + + let t1 = Timestamp::new(i64::MAX, TimeUnit::Millisecond); + let t2 = Timestamp::new(i64::MIN, TimeUnit::Second); + assert!(t1 > t2); + + let t1 = Timestamp::new(0, TimeUnit::Nanosecond); + let t2 = Timestamp::new(i64::MIN, TimeUnit::Second); + assert!(t1 > t2); + } + + fn check_hash_eq(t1: Timestamp, t2: Timestamp) { + let mut hasher = DefaultHasher::new(); + t1.hash(&mut hasher); + let t1_hash = hasher.finish(); + + let mut hasher = DefaultHasher::new(); + t2.hash(&mut hasher); + let t2_hash = hasher.finish(); + assert_eq!(t2_hash, t1_hash); + } + + #[test] + fn test_hash() { + check_hash_eq( + Timestamp::new(0, TimeUnit::Millisecond), + Timestamp::new(0, TimeUnit::Second), + ); + check_hash_eq( + Timestamp::new(1000, TimeUnit::Millisecond), + Timestamp::new(1, TimeUnit::Second), + ); + check_hash_eq( + Timestamp::new(1_000_000, TimeUnit::Microsecond), + Timestamp::new(1, TimeUnit::Second), + ); + check_hash_eq( + Timestamp::new(1_000_000_000, TimeUnit::Nanosecond), + Timestamp::new(1, TimeUnit::Second), + ); + } + #[test] pub fn test_from_i64() { let t: Timestamp = 42.into(); @@ -437,4 +630,92 @@ mod tests { } ); } + + #[test] + fn test_convert_timestamp() { + let ts = Timestamp::new(1, TimeUnit::Second); + assert_eq!( + Timestamp::new(1000, TimeUnit::Millisecond), + ts.convert_to(TimeUnit::Millisecond).unwrap() + ); + assert_eq!( + Timestamp::new(1_000_000, TimeUnit::Microsecond), + ts.convert_to(TimeUnit::Microsecond).unwrap() + ); + assert_eq!( + Timestamp::new(1_000_000_000, TimeUnit::Nanosecond), + ts.convert_to(TimeUnit::Nanosecond).unwrap() + ); + + let ts = Timestamp::new(1_000_100_100, TimeUnit::Nanosecond); + assert_eq!( + Timestamp::new(1_000_100, TimeUnit::Microsecond), + ts.convert_to(TimeUnit::Microsecond).unwrap() + ); + assert_eq!( + Timestamp::new(1000, TimeUnit::Millisecond), + ts.convert_to(TimeUnit::Millisecond).unwrap() + ); + assert_eq!( + Timestamp::new(1, TimeUnit::Second), + ts.convert_to(TimeUnit::Second).unwrap() + ); + + let ts = Timestamp::new(1_000_100_100, TimeUnit::Nanosecond); + assert_eq!(ts, ts.convert_to(TimeUnit::Nanosecond).unwrap()); + let ts = Timestamp::new(1_000_100_100, TimeUnit::Microsecond); + assert_eq!(ts, ts.convert_to(TimeUnit::Microsecond).unwrap()); + let ts = Timestamp::new(1_000_100_100, TimeUnit::Millisecond); + assert_eq!(ts, ts.convert_to(TimeUnit::Millisecond).unwrap()); + let ts = Timestamp::new(1_000_100_100, TimeUnit::Second); + assert_eq!(ts, ts.convert_to(TimeUnit::Second).unwrap()); + + // -9223372036854775808 in milliseconds should be rounded up to -9223372036854776 in seconds + assert_eq!( + Timestamp::new(-9223372036854776, TimeUnit::Second), + Timestamp::new(i64::MIN, TimeUnit::Millisecond) + .convert_to(TimeUnit::Second) + .unwrap() + ); + + assert!(Timestamp::new(i64::MAX, TimeUnit::Second) + .convert_to(TimeUnit::Millisecond) + .is_none()); + } + + #[test] + fn test_split() { + assert_eq!((0, 0), Timestamp::new(0, TimeUnit::Second).split()); + assert_eq!((1, 0), Timestamp::new(1, TimeUnit::Second).split()); + assert_eq!( + (0, 1_000_000), + Timestamp::new(1, TimeUnit::Millisecond).split() + ); + + assert_eq!((0, 1_000), Timestamp::new(1, TimeUnit::Microsecond).split()); + assert_eq!((0, 1), Timestamp::new(1, TimeUnit::Nanosecond).split()); + + assert_eq!( + (1, 1_000_000), + Timestamp::new(1001, TimeUnit::Millisecond).split() + ); + + assert_eq!( + (-2, 999_000_000), + Timestamp::new(-1001, TimeUnit::Millisecond).split() + ); + + // check min value of nanos + let (sec, nsec) = Timestamp::new(i64::MIN, TimeUnit::Nanosecond).split(); + assert_eq!( + i64::MIN as i128, + sec as i128 * (TimeUnit::Second.factor() / TimeUnit::Nanosecond.factor()) as i128 + + nsec as i128 + ); + + assert_eq!( + (i64::MAX, 0), + Timestamp::new(i64::MAX, TimeUnit::Second).split() + ); + } } diff --git a/src/servers/src/mysql/writer.rs b/src/servers/src/mysql/writer.rs index 901e98d9e1..6940b6a882 100644 --- a/src/servers/src/mysql/writer.rs +++ b/src/servers/src/mysql/writer.rs @@ -126,8 +126,10 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> { Value::Binary(v) => row_writer.write_col(v.deref())?, Value::Date(v) => row_writer.write_col(v.val())?, Value::DateTime(v) => row_writer.write_col(v.val())?, - Value::Timestamp(v) => row_writer - .write_col(DateTime::new(v.convert_to(TimeUnit::Second)).to_string())?, + Value::Timestamp(v) => row_writer.write_col( + // safety: converting timestamp with whatever unit to second will not cause overflow + DateTime::new(v.convert_to(TimeUnit::Second).unwrap().value()).to_string(), + )?, Value::List(_) => { return Err(Error::Internal { err_msg: format!( diff --git a/src/sql/src/error.rs b/src/sql/src/error.rs index 7824300b5f..2ce09c2c7e 100644 --- a/src/sql/src/error.rs +++ b/src/sql/src/error.rs @@ -15,6 +15,8 @@ use std::any::Any; use common_error::prelude::*; +use common_time::timestamp::TimeUnit; +use common_time::Timestamp; use datatypes::prelude::ConcreteDataType; use sqlparser::parser::ParserError; use sqlparser::tokenizer::TokenizerError; @@ -134,6 +136,17 @@ pub enum Error { #[snafu(display("Invalid sql value: {}", value))] InvalidSqlValue { value: String, backtrace: Backtrace }, + + #[snafu(display( + "Converting timestamp {:?} to unit {:?} overflow", + timestamp, + target_unit + ))] + TimestampOverflow { + timestamp: Timestamp, + target_unit: TimeUnit, + backtrace: Backtrace, + }, } impl ErrorExt for Error { @@ -160,6 +173,7 @@ impl ErrorExt for Error { SerializeColumnDefaultConstraint { source, .. } => source.status_code(), ConvertToGrpcDataType { source, .. } => source.status_code(), InvalidSqlValue { .. } => StatusCode::InvalidArguments, + TimestampOverflow { .. } => StatusCode::InvalidArguments, } } diff --git a/src/sql/src/statements.rs b/src/sql/src/statements.rs index 93ecf4a7a8..3b023dc4e6 100644 --- a/src/sql/src/statements.rs +++ b/src/sql/src/statements.rs @@ -32,7 +32,7 @@ use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema}; use datatypes::types::DateTimeType; use datatypes::value::Value; -use snafu::{ensure, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; use crate::ast::{ ColumnDef, ColumnOption, ColumnOptionDef, DataType as SqlDataType, Expr, ObjectName, @@ -40,7 +40,7 @@ use crate::ast::{ }; use crate::error::{ self, ColumnTypeMismatchSnafu, ConvertToGrpcDataTypeSnafu, InvalidSqlValueSnafu, - ParseSqlValueSnafu, Result, SerializeColumnDefaultConstraintSnafu, + ParseSqlValueSnafu, Result, SerializeColumnDefaultConstraintSnafu, TimestampOverflowSnafu, UnsupportedDefaultValueSnafu, }; @@ -112,10 +112,12 @@ fn parse_string_to_value( } ConcreteDataType::Timestamp(t) => { if let Ok(ts) = Timestamp::from_str(&s) { - Ok(Value::Timestamp(Timestamp::new( - ts.convert_to(t.unit()), - t.unit(), - ))) + Ok(Value::Timestamp(ts.convert_to(t.unit()).context( + TimestampOverflowSnafu { + timestamp: ts, + target_unit: t.unit(), + }, + )?)) } else { ParseSqlValueSnafu { msg: format!("Failed to parse {s} to Timestamp value"), diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index c53b207577..cbbb3d5d71 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -13,6 +13,7 @@ common-error = { path = "../common/error" } common-query = { path = "../common/query" } common-recordbatch = { path = "../common/recordbatch" } common-telemetry = { path = "../common/telemetry" } +common-time = { path = "../common/time" } datafusion.workspace = true datafusion-common.workspace = true datafusion-expr.workspace = true