diff --git a/Cargo.lock b/Cargo.lock index c1cadcceb8..8ad2d13759 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -854,6 +854,7 @@ dependencies = [ "arrow2", "common-base", "common-error", + "common-time", "datafusion", "datafusion-common", "datafusion-expr", @@ -4484,6 +4485,7 @@ dependencies = [ "common-recordbatch", "common-runtime", "common-telemetry", + "common-time", "datatypes", "futures", "hyper", @@ -4936,6 +4938,7 @@ dependencies = [ "common-query", "common-recordbatch", "common-telemetry", + "common-time", "datafusion-common", "datatypes", "futures", diff --git a/src/api/greptime/v1/column.proto b/src/api/greptime/v1/column.proto index 45dbd6bcbf..6454b9303b 100644 --- a/src/api/greptime/v1/column.proto +++ b/src/api/greptime/v1/column.proto @@ -67,4 +67,5 @@ enum ColumnDataType { STRING = 12; DATE = 13; DATETIME = 14; + TIMESTAMP = 15; } diff --git a/src/catalog/src/system.rs b/src/catalog/src/system.rs index 1ad0fcd157..a8d9b2dd82 100644 --- a/src/catalog/src/system.rs +++ b/src/catalog/src/system.rs @@ -5,10 +5,11 @@ use std::sync::Arc; use common_query::logical_plan::Expr; use common_recordbatch::SendableRecordBatchStream; use common_telemetry::debug; +use common_time::timestamp::Timestamp; use common_time::util; use datatypes::prelude::{ConcreteDataType, ScalarVector}; use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder, SchemaRef}; -use datatypes::vectors::{BinaryVector, Int64Vector, UInt8Vector}; +use datatypes::vectors::{BinaryVector, Int64Vector, TimestampVector, UInt8Vector}; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; use table::engine::{EngineContext, TableEngineRef}; @@ -129,7 +130,7 @@ fn build_system_catalog_schema() -> Schema { ), ColumnSchema::new( "timestamp".to_string(), - ConcreteDataType::int64_datatype(), + ConcreteDataType::timestamp_millis_datatype(), false, ), ColumnSchema::new( @@ -171,7 +172,7 @@ pub fn build_table_insert_request(full_table_name: String, table_id: TableId) -> // Timestamp in key part is intentionally left to 0 columns_values.insert( "timestamp".to_string(), - Arc::new(Int64Vector::from_slice(&[0])) as _, + Arc::new(TimestampVector::from_slice(&[Timestamp::from_millis(0)])) as _, ); columns_values.insert( diff --git a/src/common/query/Cargo.toml b/src/common/query/Cargo.toml index 8bf67a2a0a..b21c5e3058 100644 --- a/src/common/query/Cargo.toml +++ b/src/common/query/Cargo.toml @@ -9,10 +9,11 @@ version="0.10" [dependencies] common-error = { path = "../error" } -datafusion = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2", features = ["simd"]} -datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2"} -datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2"} -datatypes = { path = "../../datatypes"} +common-time = { path = "../time" } +datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] } +datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" } +datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" } +datatypes = { path = "../../datatypes" } snafu = { version = "0.7", features = ["backtraces"] } statrs = "0.15" diff --git a/src/common/query/src/logical_plan/accumulator.rs b/src/common/query/src/logical_plan/accumulator.rs index d03c50423a..949024df95 100644 --- a/src/common/query/src/logical_plan/accumulator.rs +++ b/src/common/query/src/logical_plan/accumulator.rs @@ -4,6 +4,7 @@ use std::fmt::Debug; use std::sync::Arc; use arrow::array::ArrayRef; +use common_time::timestamp::TimeUnit; use datafusion_common::Result as DfResult; use datafusion_expr::Accumulator as DfAccumulator; use datatypes::prelude::*; @@ -184,9 +185,19 @@ fn try_into_scalar_value(value: Value, datatype: &ConcreteDataType) -> Result ScalarValue::Date64(Some(v.val())), Value::Null => try_convert_null_value(datatype)?, Value::List(list) => try_convert_list_value(list)?, + Value::Timestamp(t) => timestamp_to_scalar_value(t.unit(), Some(t.value())), }) } +fn timestamp_to_scalar_value(unit: TimeUnit, val: Option) -> ScalarValue { + match unit { + TimeUnit::Second => ScalarValue::TimestampSecond(val, None), + TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(val, None), + TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(val, None), + TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(val, None), + } +} + fn try_convert_null_value(datatype: &ConcreteDataType) -> Result { Ok(match datatype { ConcreteDataType::Boolean(_) => ScalarValue::Boolean(None), @@ -202,6 +213,7 @@ fn try_convert_null_value(datatype: &ConcreteDataType) -> Result { ConcreteDataType::Float64(_) => ScalarValue::Float64(None), ConcreteDataType::Binary(_) => ScalarValue::LargeBinary(None), ConcreteDataType::String(_) => ScalarValue::Utf8(None), + ConcreteDataType::Timestamp(t) => timestamp_to_scalar_value(t.unit, None), _ => { return error::BadAccumulatorImplSnafu { err_msg: format!( @@ -427,4 +439,24 @@ mod tests { _ => unreachable!(), } } + + #[test] + pub fn test_timestamp_to_scalar_value() { + assert_eq!( + ScalarValue::TimestampSecond(Some(1), None), + timestamp_to_scalar_value(TimeUnit::Second, Some(1)) + ); + assert_eq!( + ScalarValue::TimestampMillisecond(Some(1), None), + timestamp_to_scalar_value(TimeUnit::Millisecond, Some(1)) + ); + assert_eq!( + ScalarValue::TimestampMicrosecond(Some(1), None), + timestamp_to_scalar_value(TimeUnit::Microsecond, Some(1)) + ); + assert_eq!( + ScalarValue::TimestampNanosecond(Some(1), None), + timestamp_to_scalar_value(TimeUnit::Nanosecond, Some(1)) + ); + } } diff --git a/src/common/time/src/date.rs b/src/common/time/src/date.rs index 0f1beb735d..6f13046afa 100644 --- a/src/common/time/src/date.rs +++ b/src/common/time/src/date.rs @@ -33,6 +33,12 @@ impl FromStr for Date { } } +impl From for Date { + fn from(v: i32) -> Self { + Self(v) + } +} + impl Display for Date { /// [Date] is formatted according to ISO-8601 standard. fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { @@ -86,4 +92,10 @@ mod tests { date.0 += 1000; assert_eq!(date, Date::from_str(&date.to_string()).unwrap()); } + + #[test] + pub fn test_from() { + let d: Date = 42.into(); + assert_eq!(42, d.val()); + } } diff --git a/src/common/time/src/datetime.rs b/src/common/time/src/datetime.rs index f7feaf6e89..88b34a2c50 100644 --- a/src/common/time/src/datetime.rs +++ b/src/common/time/src/datetime.rs @@ -38,6 +38,12 @@ impl FromStr for DateTime { } } +impl From for DateTime { + fn from(v: i64) -> Self { + Self(v) + } +} + impl DateTime { pub fn new(val: i64) -> Self { Self(val) @@ -65,4 +71,10 @@ mod tests { let dt = DateTime::from_str(time).unwrap(); assert_eq!(time, &dt.to_string()); } + + #[test] + pub fn test_from() { + let d: DateTime = 42.into(); + assert_eq!(42, d.val()); + } } diff --git a/src/common/time/src/lib.rs b/src/common/time/src/lib.rs index e8bf43975e..3b2efc4af2 100644 --- a/src/common/time/src/lib.rs +++ b/src/common/time/src/lib.rs @@ -3,7 +3,8 @@ pub mod datetime; pub mod error; pub mod range; pub mod timestamp; +pub mod timestamp_millis; pub mod util; pub use range::RangeMillis; -pub use timestamp::TimestampMillis; +pub use timestamp_millis::TimestampMillis; diff --git a/src/common/time/src/range.rs b/src/common/time/src/range.rs index 8dc51193e6..ef2985a54e 100644 --- a/src/common/time/src/range.rs +++ b/src/common/time/src/range.rs @@ -1,4 +1,4 @@ -use crate::timestamp::TimestampMillis; +use crate::timestamp_millis::TimestampMillis; /// A half-open time range. /// diff --git a/src/common/time/src/timestamp.rs b/src/common/time/src/timestamp.rs index 4eddc58d41..1b3433239e 100644 --- a/src/common/time/src/timestamp.rs +++ b/src/common/time/src/timestamp.rs @@ -1,77 +1,94 @@ +use core::default::Default; use std::cmp::Ordering; +use std::hash::{Hash, Hasher}; -/// Unix timestamp in millisecond resolution. -/// -/// Negative timestamp is allowed, which represents timestamp before '1970-01-01T00:00:00'. -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub struct TimestampMillis(i64); +use serde::{Deserialize, Serialize}; -impl TimestampMillis { - /// Positive infinity. - pub const INF: TimestampMillis = TimestampMillis::new(i64::MAX); - /// Maximum value of a timestamp. - /// - /// The maximum value of i64 is reserved for infinity. - pub const MAX: TimestampMillis = TimestampMillis::new(i64::MAX - 1); - /// Minimum value of a timestamp. - pub const MIN: TimestampMillis = TimestampMillis::new(i64::MIN); +#[derive(Debug, Clone, Default, Copy, Serialize, Deserialize)] +pub struct Timestamp { + value: i64, + unit: TimeUnit, +} - /// Create a new timestamp from unix timestamp in milliseconds. - pub const fn new(ms: i64) -> TimestampMillis { - TimestampMillis(ms) +impl Timestamp { + pub fn new(value: i64, unit: TimeUnit) -> Self { + Self { unit, value } } - /// Returns the timestamp aligned by `bucket_duration` in milliseconds or - /// `None` if overflow occurred. - /// - /// # Panics - /// Panics if `bucket_duration <= 0`. - pub fn align_by_bucket(self, bucket_duration: i64) -> Option { - assert!(bucket_duration > 0); - - let ts = if self.0 >= 0 { - self.0 - } else { - // `bucket_duration > 0` implies `bucket_duration - 1` won't overflow. - self.0.checked_sub(bucket_duration - 1)? - }; - - Some(TimestampMillis(ts / bucket_duration * bucket_duration)) + pub fn from_millis(value: i64) -> Self { + Self { + value, + unit: TimeUnit::Millisecond, + } } - /// Returns the timestamp value as i64. - pub fn as_i64(&self) -> i64 { - self.0 + pub fn unit(&self) -> TimeUnit { + self.unit + } + + pub fn value(&self) -> i64 { + self.value + } + + pub fn convert_to(&self, unit: TimeUnit) -> i64 { + // TODO(hl): May result into overflow + self.value * self.unit.factor() / unit.factor() } } -impl From for TimestampMillis { - fn from(ms: i64) -> TimestampMillis { - TimestampMillis::new(ms) +impl From for Timestamp { + fn from(v: i64) -> Self { + Self { + value: v, + unit: TimeUnit::Millisecond, + } } } -impl PartialEq for TimestampMillis { - fn eq(&self, other: &i64) -> bool { - self.0 == *other +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum TimeUnit { + Second, + #[default] + Millisecond, + Microsecond, + Nanosecond, +} + +impl TimeUnit { + pub fn factor(&self) -> i64 { + match self { + TimeUnit::Second => 1_000_000_000, + TimeUnit::Millisecond => 1_000_000, + TimeUnit::Microsecond => 1_000, + TimeUnit::Nanosecond => 1, + } } } -impl PartialEq for i64 { - fn eq(&self, other: &TimestampMillis) -> bool { - *self == other.0 +impl PartialOrd for Timestamp { + fn partial_cmp(&self, other: &Self) -> Option { + (self.value * self.unit.factor()).partial_cmp(&(other.value * other.unit.factor())) } } -impl PartialOrd for TimestampMillis { - fn partial_cmp(&self, other: &i64) -> Option { - Some(self.0.cmp(other)) +impl Ord for Timestamp { + fn cmp(&self, other: &Self) -> Ordering { + (self.value * self.unit.factor()).cmp(&(other.value * other.unit.factor())) } } -impl PartialOrd for i64 { - fn partial_cmp(&self, other: &TimestampMillis) -> Option { - Some(self.cmp(&other.0)) +impl PartialEq for Timestamp { + fn eq(&self, other: &Self) -> bool { + self.convert_to(TimeUnit::Nanosecond) == other.convert_to(TimeUnit::Nanosecond) + } +} + +impl Eq for Timestamp {} + +impl Hash for Timestamp { + fn hash(&self, state: &mut H) { + state.write_i64(self.convert_to(TimeUnit::Nanosecond)); + state.finish(); } } @@ -80,44 +97,34 @@ mod tests { use super::*; #[test] - fn test_timestamp() { - let ts = 123456; - let timestamp = TimestampMillis::from(ts); - assert_eq!(timestamp, ts); - assert_eq!(ts, timestamp); - assert_eq!(ts, timestamp.as_i64()); - - assert_ne!(TimestampMillis::new(0), timestamp); - assert!(TimestampMillis::new(-123) < TimestampMillis::new(0)); - assert!(TimestampMillis::new(10) < 20); - assert!(10 < TimestampMillis::new(20)); - - assert_eq!(i64::MAX, TimestampMillis::INF); - assert_eq!(i64::MAX - 1, TimestampMillis::MAX); - assert_eq!(i64::MIN, TimestampMillis::MIN); + pub fn test_time_unit() { + assert_eq!( + TimeUnit::Millisecond.factor() * 1000, + TimeUnit::Second.factor() + ); + assert_eq!( + TimeUnit::Microsecond.factor() * 1000000, + TimeUnit::Second.factor() + ); + assert_eq!( + TimeUnit::Nanosecond.factor() * 1000000000, + TimeUnit::Second.factor() + ); } #[test] - fn test_align_by_bucket() { - let bucket = 100; - assert_eq!(0, TimestampMillis::new(0).align_by_bucket(bucket).unwrap()); - assert_eq!(0, TimestampMillis::new(1).align_by_bucket(bucket).unwrap()); - assert_eq!(0, TimestampMillis::new(99).align_by_bucket(bucket).unwrap()); - assert_eq!( - 100, - TimestampMillis::new(100).align_by_bucket(bucket).unwrap() - ); - assert_eq!( - 100, - TimestampMillis::new(199).align_by_bucket(bucket).unwrap() - ); + pub fn test_timestamp() { + let t = Timestamp::new(1, TimeUnit::Millisecond); + assert_eq!(TimeUnit::Millisecond, t.unit()); + assert_eq!(1, t.value()); + assert_eq!(Timestamp::new(1000, TimeUnit::Microsecond), t); + assert!(t > Timestamp::new(999, TimeUnit::Microsecond)); + } - assert_eq!(0, TimestampMillis::MAX.align_by_bucket(i64::MAX).unwrap()); - assert_eq!( - i64::MAX, - TimestampMillis::INF.align_by_bucket(i64::MAX).unwrap() - ); - - assert_eq!(None, TimestampMillis::MIN.align_by_bucket(bucket)); + #[test] + pub fn test_from_i64() { + let t: Timestamp = 42.into(); + assert_eq!(42, t.value()); + assert_eq!(TimeUnit::Millisecond, t.unit()); } } diff --git a/src/common/time/src/timestamp_millis.rs b/src/common/time/src/timestamp_millis.rs new file mode 100644 index 0000000000..4eddc58d41 --- /dev/null +++ b/src/common/time/src/timestamp_millis.rs @@ -0,0 +1,123 @@ +use std::cmp::Ordering; + +/// Unix timestamp in millisecond resolution. +/// +/// Negative timestamp is allowed, which represents timestamp before '1970-01-01T00:00:00'. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct TimestampMillis(i64); + +impl TimestampMillis { + /// Positive infinity. + pub const INF: TimestampMillis = TimestampMillis::new(i64::MAX); + /// Maximum value of a timestamp. + /// + /// The maximum value of i64 is reserved for infinity. + pub const MAX: TimestampMillis = TimestampMillis::new(i64::MAX - 1); + /// Minimum value of a timestamp. + pub const MIN: TimestampMillis = TimestampMillis::new(i64::MIN); + + /// Create a new timestamp from unix timestamp in milliseconds. + pub const fn new(ms: i64) -> TimestampMillis { + TimestampMillis(ms) + } + + /// Returns the timestamp aligned by `bucket_duration` in milliseconds or + /// `None` if overflow occurred. + /// + /// # Panics + /// Panics if `bucket_duration <= 0`. + pub fn align_by_bucket(self, bucket_duration: i64) -> Option { + assert!(bucket_duration > 0); + + let ts = if self.0 >= 0 { + self.0 + } else { + // `bucket_duration > 0` implies `bucket_duration - 1` won't overflow. + self.0.checked_sub(bucket_duration - 1)? + }; + + Some(TimestampMillis(ts / bucket_duration * bucket_duration)) + } + + /// Returns the timestamp value as i64. + pub fn as_i64(&self) -> i64 { + self.0 + } +} + +impl From for TimestampMillis { + fn from(ms: i64) -> TimestampMillis { + TimestampMillis::new(ms) + } +} + +impl PartialEq for TimestampMillis { + fn eq(&self, other: &i64) -> bool { + self.0 == *other + } +} + +impl PartialEq for i64 { + fn eq(&self, other: &TimestampMillis) -> bool { + *self == other.0 + } +} + +impl PartialOrd for TimestampMillis { + fn partial_cmp(&self, other: &i64) -> Option { + Some(self.0.cmp(other)) + } +} + +impl PartialOrd for i64 { + fn partial_cmp(&self, other: &TimestampMillis) -> Option { + Some(self.cmp(&other.0)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_timestamp() { + let ts = 123456; + let timestamp = TimestampMillis::from(ts); + assert_eq!(timestamp, ts); + assert_eq!(ts, timestamp); + assert_eq!(ts, timestamp.as_i64()); + + assert_ne!(TimestampMillis::new(0), timestamp); + assert!(TimestampMillis::new(-123) < TimestampMillis::new(0)); + assert!(TimestampMillis::new(10) < 20); + assert!(10 < TimestampMillis::new(20)); + + assert_eq!(i64::MAX, TimestampMillis::INF); + assert_eq!(i64::MAX - 1, TimestampMillis::MAX); + assert_eq!(i64::MIN, TimestampMillis::MIN); + } + + #[test] + fn test_align_by_bucket() { + let bucket = 100; + assert_eq!(0, TimestampMillis::new(0).align_by_bucket(bucket).unwrap()); + assert_eq!(0, TimestampMillis::new(1).align_by_bucket(bucket).unwrap()); + assert_eq!(0, TimestampMillis::new(99).align_by_bucket(bucket).unwrap()); + assert_eq!( + 100, + TimestampMillis::new(100).align_by_bucket(bucket).unwrap() + ); + assert_eq!( + 100, + TimestampMillis::new(199).align_by_bucket(bucket).unwrap() + ); + + assert_eq!(0, TimestampMillis::MAX.align_by_bucket(i64::MAX).unwrap()); + assert_eq!( + i64::MAX, + TimestampMillis::INF.align_by_bucket(i64::MAX).unwrap() + ); + + assert_eq!(None, TimestampMillis::MIN.align_by_bucket(bucket)); + } +} diff --git a/src/datanode/src/server/grpc/create.rs b/src/datanode/src/server/grpc/create.rs index dfa936086b..94c7b56435 100644 --- a/src/datanode/src/server/grpc/create.rs +++ b/src/datanode/src/server/grpc/create.rs @@ -111,6 +111,7 @@ fn create_column_schema(column_def: &ColumnDef) -> Result { ColumnDataType::String => ConcreteDataType::string_datatype(), ColumnDataType::Date => ConcreteDataType::date_datatype(), ColumnDataType::Datetime => ConcreteDataType::datetime_datatype(), + ColumnDataType::Timestamp => ConcreteDataType::timestamp_millis_datatype(), }; Ok(ColumnSchema { name: column_def.name.clone(), diff --git a/src/datanode/src/server/grpc/insert.rs b/src/datanode/src/server/grpc/insert.rs index 3067f5ec40..2bab5b6319 100644 --- a/src/datanode/src/server/grpc/insert.rs +++ b/src/datanode/src/server/grpc/insert.rs @@ -6,6 +6,7 @@ use std::{ use api::v1::{codec::InsertBatch, column::Values, Column, InsertExpr}; use common_base::BitVec; +use common_time::timestamp::Timestamp; use datatypes::{data_type::ConcreteDataType, value::Value, vectors::VectorBuilder}; use snafu::{ensure, OptionExt, ResultExt}; use table::{requests::InsertRequest, Table}; @@ -170,7 +171,23 @@ fn convert_values(data_type: &ConcreteDataType, values: Values) -> Vec { .into_iter() .map(|val| val.into()) .collect(), - _ => unimplemented!(), + ConcreteDataType::DateTime(_) => values + .i64_values + .into_iter() + .map(|v| Value::DateTime(v.into())) + .collect(), + ConcreteDataType::Date(_) => values + .i32_values + .into_iter() + .map(|v| Value::Date(v.into())) + .collect(), + ConcreteDataType::Timestamp(_) => values + .i64_values + .into_iter() + .map(|v| Value::Timestamp(Timestamp::from_millis(v))) + .collect(), + ConcreteDataType::Null(_) => unreachable!(), + ConcreteDataType::List(_) => unreachable!(), } } diff --git a/src/datanode/src/server/grpc/select.rs b/src/datanode/src/server/grpc/select.rs index c5344b4d52..85dba25933 100644 --- a/src/datanode/src/server/grpc/select.rs +++ b/src/datanode/src/server/grpc/select.rs @@ -100,10 +100,10 @@ fn null_mask(arrays: &Vec>, row_count: usize) -> Vec { } macro_rules! convert_arrow_array_to_grpc_vals { - ($data_type: expr, $arrays: ident, $(($Type: ident, $CastType: ty, $field: ident, $MapFunction: expr)), +) => { + ($data_type: expr, $arrays: ident, $(($Type: pat, $CastType: ty, $field: ident, $MapFunction: expr)), +) => { match $data_type { $( - arrow::datatypes::DataType::$Type => { + $Type => { let mut vals = Values::default(); for array in $arrays { let array = array.as_any().downcast_ref::<$CastType>().with_context(|| ConversionSnafu { @@ -119,8 +119,8 @@ macro_rules! convert_arrow_array_to_grpc_vals { )+ _ => unimplemented!(), } - }; + } fn values(arrays: &[Arc]) -> Result { @@ -129,29 +129,34 @@ fn values(arrays: &[Arc]) -> Result { } let data_type = arrays[0].data_type(); + use arrow::datatypes::DataType; convert_arrow_array_to_grpc_vals!( data_type, arrays, - (Boolean, BooleanArray, bool_values, |x| {x}), + (DataType::Boolean, BooleanArray, bool_values, |x| {x}), - (Int8, PrimitiveArray, i8_values, |x| {*x as i32}), - (Int16, PrimitiveArray, i16_values, |x| {*x as i32}), - (Int32, PrimitiveArray, i32_values, |x| {*x}), - (Int64, PrimitiveArray, i64_values, |x| {*x}), + (DataType::Int8, PrimitiveArray, i8_values, |x| {*x as i32}), + (DataType::Int16, PrimitiveArray, i16_values, |x| {*x as i32}), + (DataType::Int32, PrimitiveArray, i32_values, |x| {*x}), + (DataType::Int64, PrimitiveArray, i64_values, |x| {*x}), - (UInt8, PrimitiveArray, u8_values, |x| {*x as u32}), - (UInt16, PrimitiveArray, u16_values, |x| {*x as u32}), - (UInt32, PrimitiveArray, u32_values, |x| {*x}), - (UInt64, PrimitiveArray, u64_values, |x| {*x}), + (DataType::UInt8, PrimitiveArray, u8_values, |x| {*x as u32}), + (DataType::UInt16, PrimitiveArray, u16_values, |x| {*x as u32}), + (DataType::UInt32, PrimitiveArray, u32_values, |x| {*x}), + (DataType::UInt64, PrimitiveArray, u64_values, |x| {*x}), - (Float32, PrimitiveArray, f32_values, |x| {*x}), - (Float64, PrimitiveArray, f64_values, |x| {*x}), + (DataType::Float32, PrimitiveArray, f32_values, |x| {*x}), + (DataType::Float64, PrimitiveArray, f64_values, |x| {*x}), - (Binary, BinaryArray, binary_values, |x| {x.into()}), - (LargeBinary, BinaryArray, binary_values, |x| {x.into()}), + (DataType::Binary, BinaryArray, binary_values, |x| {x.into()}), + (DataType::LargeBinary, BinaryArray, binary_values, |x| {x.into()}), - (Utf8, StringArray, string_values, |x| {x.into()}), - (LargeUtf8, StringArray, string_values, |x| {x.into()}) + (DataType::Utf8, StringArray, string_values, |x| {x.into()}), + (DataType::LargeUtf8, StringArray, string_values, |x| {x.into()}), + (DataType::Date32, PrimitiveArray, i32_values, |x| {*x as i32}), + (DataType::Date64, PrimitiveArray, i64_values, |x| {*x as i64}), + + (DataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, _), PrimitiveArray, i64_values, |x| {*x} ) ) } diff --git a/src/datanode/src/sql.rs b/src/datanode/src/sql.rs index cb5f0cf039..810cb60c56 100644 --- a/src/datanode/src/sql.rs +++ b/src/datanode/src/sql.rs @@ -121,6 +121,7 @@ fn sql_data_type_to_concrete_data_type(data_type: &SqlDataType) -> Result Ok(ConcreteDataType::timestamp_millis_datatype()), _ => error::SqlTypeNotSupportedSnafu { t: data_type.clone(), } diff --git a/src/datanode/src/sql/create.rs b/src/datanode/src/sql/create.rs index 6e9ad77ac4..de4e4d10b1 100644 --- a/src/datanode/src/sql/create.rs +++ b/src/datanode/src/sql/create.rs @@ -329,5 +329,9 @@ mod tests { SqlDataType::Custom(ObjectName(vec![Ident::new("datetime")])), ConcreteDataType::datetime_datatype(), ); + check_type( + SqlDataType::Timestamp, + ConcreteDataType::timestamp_millis_datatype(), + ); } } diff --git a/src/datanode/src/sql/insert.rs b/src/datanode/src/sql/insert.rs index 40bc9b55c0..7c0fb19b49 100644 --- a/src/datanode/src/sql/insert.rs +++ b/src/datanode/src/sql/insert.rs @@ -156,7 +156,6 @@ fn parse_sql_value( parse_string_to_value(s.to_owned(), data_type)? } - _ => todo!("Other sql value"), }) } @@ -220,8 +219,10 @@ fn sql_number_to_value(data_type: &ConcreteDataType, n: &str) -> Result { (Int32, i32), (Int64, i64), (Float64, f64), - (Float32, f32) + (Float32, f32), + (Timestamp, i64) ) + // TODO(hl): also Date/DateTime } fn parse_sql_number(n: &str) -> Result diff --git a/src/datanode/src/tests/grpc_test.rs b/src/datanode/src/tests/grpc_test.rs index 3c6a01e018..3785ceb3ae 100644 --- a/src/datanode/src/tests/grpc_test.rs +++ b/src/datanode/src/tests/grpc_test.rs @@ -147,7 +147,7 @@ fn testing_create_expr() -> CreateExpr { }, ColumnDef { name: "ts".to_string(), - data_type: 4, // int64 + data_type: 15, // timestamp is_nullable: true, }, ]; diff --git a/src/datanode/src/tests/http_test.rs b/src/datanode/src/tests/http_test.rs index dc625d9c68..355b6fa214 100644 --- a/src/datanode/src/tests/http_test.rs +++ b/src/datanode/src/tests/http_test.rs @@ -64,7 +64,7 @@ async fn test_sql_api() { let body = res.text().await; assert_eq!( body, - r#"{"success":true,"output":{"Rows":[{"schema":{"fields":[{"name":"host","data_type":"Utf8","is_nullable":false,"metadata":{}},{"name":"cpu","data_type":"Float64","is_nullable":true,"metadata":{}},{"name":"memory","data_type":"Float64","is_nullable":true,"metadata":{}},{"name":"ts","data_type":"Int64","is_nullable":true,"metadata":{}}],"metadata":{"greptime:timestamp_column":"ts","greptime:version":"0"}},"columns":[["host"],[66.6],[1024.0],[0]]}]}}"# + r#"{"success":true,"output":{"Rows":[{"schema":{"fields":[{"name":"host","data_type":"Utf8","is_nullable":false,"metadata":{}},{"name":"cpu","data_type":"Float64","is_nullable":true,"metadata":{}},{"name":"memory","data_type":"Float64","is_nullable":true,"metadata":{}},{"name":"ts","data_type":{"Timestamp":["Millisecond",null]},"is_nullable":true,"metadata":{}}],"metadata":{"greptime:timestamp_column":"ts","greptime:version":"0"}},"columns":[["host"],[66.6],[1024.0],[0]]}]}}"# ); // select with projections @@ -77,7 +77,7 @@ async fn test_sql_api() { let body = res.text().await; assert_eq!( body, - r#"{"success":true,"output":{"Rows":[{"schema":{"fields":[{"name":"cpu","data_type":"Float64","is_nullable":true,"metadata":{}},{"name":"ts","data_type":"Int64","is_nullable":true,"metadata":{}}],"metadata":{"greptime:timestamp_column":"ts","greptime:version":"0"}},"columns":[[66.6],[0]]}]}}"# + r#"{"success":true,"output":{"Rows":[{"schema":{"fields":[{"name":"cpu","data_type":"Float64","is_nullable":true,"metadata":{}},{"name":"ts","data_type":{"Timestamp":["Millisecond",null]},"is_nullable":true,"metadata":{}}],"metadata":{"greptime:timestamp_column":"ts","greptime:version":"0"}},"columns":[[66.6],[0]]}]}}"# ); } diff --git a/src/datanode/src/tests/instance_test.rs b/src/datanode/src/tests/instance_test.rs index d62fcaddc1..4e0efe528e 100644 --- a/src/datanode/src/tests/instance_test.rs +++ b/src/datanode/src/tests/instance_test.rs @@ -125,13 +125,13 @@ async fn test_alter_table() { let pretty_print = arrow_print::write(&recordbatch); let pretty_print = pretty_print.lines().collect::>(); let expected = vec![ - "+-------+------+-----+--------+--------+", - "| host | ts | cpu | memory | my_tag |", - "+-------+------+-----+--------+--------+", - "| host1 | 1000 | 1.1 | 100 | |", - "| host2 | 2000 | 2.2 | 200 | hello |", - "| host3 | 3000 | 3.3 | 300 | |", - "+-------+------+-----+--------+--------+", + "+-------+---------------------+-----+--------+--------+", + "| host | ts | cpu | memory | my_tag |", + "+-------+---------------------+-----+--------+--------+", + "| host1 | 1970-01-01 00:00:01 | 1.1 | 100 | |", + "| host2 | 1970-01-01 00:00:02 | 2.2 | 200 | hello |", + "| host3 | 1970-01-01 00:00:03 | 3.3 | 300 | |", + "+-------+---------------------+-----+--------+--------+", ]; assert_eq!(pretty_print, expected); } diff --git a/src/datanode/src/tests/test_util.rs b/src/datanode/src/tests/test_util.rs index 58a7b95230..8eb713a2d4 100644 --- a/src/datanode/src/tests/test_util.rs +++ b/src/datanode/src/tests/test_util.rs @@ -48,7 +48,7 @@ pub async fn create_test_table(instance: &Instance) -> Result<()> { ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true), ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true), - ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), true), + ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), true), ]; let table_name = "demo"; diff --git a/src/datatypes/src/data_type.rs b/src/datatypes/src/data_type.rs index 93d9450631..3017362023 100644 --- a/src/datatypes/src/data_type.rs +++ b/src/datatypes/src/data_type.rs @@ -1,16 +1,17 @@ use std::sync::Arc; use arrow::datatypes::DataType as ArrowDataType; +use common_time::timestamp::TimeUnit; use paste::paste; use serde::{Deserialize, Serialize}; use crate::error::{self, Error, Result}; use crate::type_id::LogicalTypeId; -use crate::types::DateTimeType; use crate::types::{ BinaryType, BooleanType, DateType, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, ListType, NullType, StringType, UInt16Type, UInt32Type, UInt64Type, UInt8Type, }; +use crate::types::{DateTimeType, TimestampType}; use crate::value::Value; use crate::vectors::MutableVector; @@ -38,6 +39,7 @@ pub enum ConcreteDataType { Date(DateType), DateTime(DateTimeType), + Timestamp(TimestampType), List(ListType), } @@ -70,6 +72,7 @@ impl ConcreteDataType { | ConcreteDataType::Int64(_) | ConcreteDataType::Date(_) | ConcreteDataType::DateTime(_) + | ConcreteDataType::Timestamp(_) ) } @@ -84,7 +87,10 @@ impl ConcreteDataType { } pub fn is_timestamp(&self) -> bool { - matches!(self, ConcreteDataType::Int64(_)) + matches!( + self, + ConcreteDataType::Int64(_) | ConcreteDataType::Timestamp(_) + ) } pub fn numerics() -> Vec { @@ -130,6 +136,7 @@ impl TryFrom<&ArrowDataType> for ConcreteDataType { ArrowDataType::Float64 => Self::float64_datatype(), ArrowDataType::Date32 => Self::date_datatype(), ArrowDataType::Date64 => Self::datetime_datatype(), + ArrowDataType::Timestamp(u, _) => ConcreteDataType::from_arrow_time_unit(u), ArrowDataType::Binary | ArrowDataType::LargeBinary => Self::binary_datatype(), ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 => Self::string_datatype(), ArrowDataType::List(field) => Self::List(ListType::new( @@ -170,6 +177,31 @@ impl ConcreteDataType { pub fn list_datatype(inner_type: ConcreteDataType) -> ConcreteDataType { ConcreteDataType::List(ListType::new(inner_type)) } + + pub fn timestamp_datatype(unit: TimeUnit) -> Self { + ConcreteDataType::Timestamp(TimestampType::new(unit)) + } + + pub fn timestamp_millis_datatype() -> Self { + ConcreteDataType::Timestamp(TimestampType::new(TimeUnit::Millisecond)) + } + + /// Converts from arrow timestamp unit to + // TODO(hl): maybe impl From for our timestamp ? + pub fn from_arrow_time_unit(t: &arrow::datatypes::TimeUnit) -> Self { + match t { + arrow::datatypes::TimeUnit::Second => Self::timestamp_datatype(TimeUnit::Second), + arrow::datatypes::TimeUnit::Millisecond => { + Self::timestamp_datatype(TimeUnit::Millisecond) + } + arrow::datatypes::TimeUnit::Microsecond => { + Self::timestamp_datatype(TimeUnit::Microsecond) + } + arrow::datatypes::TimeUnit::Nanosecond => { + Self::timestamp_datatype(TimeUnit::Nanosecond) + } + } + } } /// Data type abstraction. @@ -288,4 +320,24 @@ mod tests { ConcreteDataType::Date(_) )); } + + #[test] + pub fn test_from_arrow_timestamp() { + assert_eq!( + ConcreteDataType::timestamp_millis_datatype(), + ConcreteDataType::from_arrow_time_unit(&arrow::datatypes::TimeUnit::Millisecond) + ); + assert_eq!( + ConcreteDataType::timestamp_datatype(TimeUnit::Microsecond), + ConcreteDataType::from_arrow_time_unit(&arrow::datatypes::TimeUnit::Microsecond) + ); + assert_eq!( + ConcreteDataType::timestamp_datatype(TimeUnit::Nanosecond), + ConcreteDataType::from_arrow_time_unit(&arrow::datatypes::TimeUnit::Nanosecond) + ); + assert_eq!( + ConcreteDataType::timestamp_datatype(TimeUnit::Second), + ConcreteDataType::from_arrow_time_unit(&arrow::datatypes::TimeUnit::Second) + ); + } } diff --git a/src/datatypes/src/scalars.rs b/src/datatypes/src/scalars.rs index 5b7a49b78a..b9463b8bcd 100644 --- a/src/datatypes/src/scalars.rs +++ b/src/datatypes/src/scalars.rs @@ -1,5 +1,7 @@ use std::any::Any; +use common_time::timestamp::Timestamp; + use crate::prelude::*; use crate::vectors::date::DateVector; use crate::vectors::datetime::DateTimeVector; @@ -100,11 +102,11 @@ where builder.finish() } - fn from_vecs(values: Vec) -> Self { - let it = values.iter(); + fn from_vec>(values: Vec) -> Self { + let it = values.into_iter(); let mut builder = Self::Builder::with_capacity(get_iter_capacity(&it)); for item in it { - builder.push(Some(item.as_scalar_ref())); + builder.push(Some(item.into().as_scalar_ref())); } builder.finish() } @@ -286,6 +288,28 @@ impl<'a> ScalarRef<'a> for common_time::datetime::DateTime { } } +impl Scalar for Timestamp { + type VectorType = TimestampVector; + type RefType<'a> = Timestamp; + + fn as_scalar_ref(&self) -> Self::RefType<'_> { + *self + } + + fn upcast_gat<'short, 'long: 'short>(long: Self::RefType<'long>) -> Self::RefType<'short> { + long + } +} + +impl<'a> ScalarRef<'a> for Timestamp { + type VectorType = TimestampVector; + type ScalarType = Timestamp; + + fn to_owned_scalar(&self) -> Self::ScalarType { + *self + } +} + #[cfg(test)] mod tests { use common_time::date::Date; @@ -350,4 +374,14 @@ mod tests { assert_eq!(date, date.as_scalar_ref()); assert_eq!(date, date.to_owned_scalar()); } + + #[test] + pub fn test_build_timestamp_vector() { + let expect: Vec> = vec![Some(10.into()), None, Some(42.into())]; + let vector: TimestampVector = build_vector_from_slice(&expect); + assert_vector_eq(&expect, &vector); + let val = vector.get_data(0).unwrap(); + assert_eq!(val, val.as_scalar_ref()); + assert_eq!(10, val.to_owned_scalar().value()); + } } diff --git a/src/datatypes/src/type_id.rs b/src/datatypes/src/type_id.rs index f663f614e9..5bd4e17463 100644 --- a/src/datatypes/src/type_id.rs +++ b/src/datatypes/src/type_id.rs @@ -30,6 +30,8 @@ pub enum LogicalTypeId { /// seconds/milliseconds/microseconds/nanoseconds, determined by precision. DateTime, + Timestamp, + List, } @@ -54,6 +56,7 @@ impl LogicalTypeId { LogicalTypeId::Binary => ConcreteDataType::binary_datatype(), LogicalTypeId::Date => ConcreteDataType::date_datatype(), LogicalTypeId::DateTime => ConcreteDataType::datetime_datatype(), + LogicalTypeId::Timestamp => ConcreteDataType::timestamp_millis_datatype(), // to timestamp type with default time unit LogicalTypeId::List => { ConcreteDataType::list_datatype(ConcreteDataType::null_datatype()) } diff --git a/src/datatypes/src/types.rs b/src/datatypes/src/types.rs index f3ef94306f..a3bfda8aa5 100644 --- a/src/datatypes/src/types.rs +++ b/src/datatypes/src/types.rs @@ -7,6 +7,7 @@ mod null_type; mod primitive_traits; mod primitive_type; mod string_type; +mod timestamp; pub use binary_type::BinaryType; pub use boolean_type::BooleanType; @@ -20,3 +21,4 @@ pub use primitive_type::{ PrimitiveType, UInt16Type, UInt32Type, UInt64Type, UInt8Type, }; pub use string_type::StringType; +pub use timestamp::TimestampType; diff --git a/src/datatypes/src/types/timestamp.rs b/src/datatypes/src/types/timestamp.rs new file mode 100644 index 0000000000..c527419bcf --- /dev/null +++ b/src/datatypes/src/types/timestamp.rs @@ -0,0 +1,111 @@ +use arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit}; +use common_time::timestamp::{TimeUnit, Timestamp}; +use serde::{Deserialize, Serialize}; + +use crate::data_type::DataType; +use crate::prelude::{LogicalTypeId, MutableVector, ScalarVectorBuilder, Value}; +use crate::vectors::TimestampVectorBuilder; + +#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct TimestampType { + pub unit: TimeUnit, +} + +impl TimestampType { + pub fn new(unit: TimeUnit) -> Self { + Self { unit } + } +} + +impl DataType for TimestampType { + fn name(&self) -> &str { + "Timestamp" + } + + fn logical_type_id(&self) -> LogicalTypeId { + LogicalTypeId::Timestamp + } + + fn default_value(&self) -> Value { + Value::Timestamp(Timestamp::new(0, self.unit)) + } + + fn as_arrow_type(&self) -> ArrowDataType { + match self.unit { + TimeUnit::Second => ArrowDataType::Timestamp(ArrowTimeUnit::Second, None), + TimeUnit::Millisecond => ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None), + TimeUnit::Microsecond => ArrowDataType::Timestamp(ArrowTimeUnit::Microsecond, None), + TimeUnit::Nanosecond => ArrowDataType::Timestamp(ArrowTimeUnit::Nanosecond, None), + } + } + + fn create_mutable_vector(&self, capacity: usize) -> Box { + Box::new(TimestampVectorBuilder::with_capacity(capacity)) + } +} + +#[cfg(test)] +mod tests { + use arrow::datatypes::TimeUnit as ArrowTimeUnit; + use common_time::timestamp::TimeUnit::Microsecond; + + use super::*; + use crate::prelude::{ConcreteDataType, ValueRef}; + + #[test] + pub fn test_timestamp_type() { + assert_eq!( + LogicalTypeId::Timestamp, + TimestampType::new(TimeUnit::Microsecond).logical_type_id() + ); + } + + #[test] + pub fn test_as_arrow_type() { + assert_eq!( + ArrowDataType::Timestamp(ArrowTimeUnit::Nanosecond, None), + TimestampType::new(TimeUnit::Nanosecond).as_arrow_type() + ); + assert_eq!( + ArrowDataType::Timestamp(ArrowTimeUnit::Microsecond, None), + TimestampType::new(TimeUnit::Microsecond).as_arrow_type() + ); + assert_eq!( + ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None), + TimestampType::new(TimeUnit::Millisecond).as_arrow_type() + ); + assert_eq!( + ArrowDataType::Timestamp(ArrowTimeUnit::Second, None), + TimestampType::new(TimeUnit::Second).as_arrow_type() + ); + } + + #[test] + pub fn test_default_value() { + assert_eq!( + Value::Timestamp(Timestamp::new(0, Microsecond)), + TimestampType::new(TimeUnit::Microsecond).default_value() + ); + } + + #[test] + pub fn test_create_mutable_vector() { + let mut builder = TimestampType::new(TimeUnit::Microsecond).create_mutable_vector(10); + builder + .push_value_ref(ValueRef::Timestamp(Timestamp::new( + 42, + TimeUnit::Millisecond, + ))) + .unwrap(); + builder.push_value_ref(ValueRef::Null).unwrap(); + builder + .push_value_ref(ValueRef::Timestamp(Timestamp::new(96, TimeUnit::Second))) + .unwrap(); + let v = builder.to_vector(); + assert_eq!(ConcreteDataType::timestamp_millis_datatype(), v.data_type()); + assert_eq!(Value::Timestamp(Timestamp::from_millis(42)), v.get(0)); + assert_eq!(Value::Null, v.get(1)); + // Push a timestamp with different unit will convert the value to value with time unit millisecond. + assert_eq!(Value::Timestamp(Timestamp::from_millis(96_000)), v.get(2)); + } +} diff --git a/src/datatypes/src/value.rs b/src/datatypes/src/value.rs index aa98ba72e8..c2aa38dd56 100644 --- a/src/datatypes/src/value.rs +++ b/src/datatypes/src/value.rs @@ -3,6 +3,7 @@ use std::cmp::Ordering; use common_base::bytes::{Bytes, StringBytes}; use common_time::date::Date; use common_time::datetime::DateTime; +use common_time::timestamp::Timestamp; pub use ordered_float::OrderedFloat; use serde::{Deserialize, Serialize}; @@ -40,6 +41,7 @@ pub enum Value { // Date & Time types: Date(Date), DateTime(DateTime), + Timestamp(Timestamp), List(ListValue), } @@ -68,6 +70,7 @@ impl Value { Value::List(list) => ConcreteDataType::list_datatype(list.datatype().clone()), Value::Date(_) => ConcreteDataType::date_datatype(), Value::DateTime(_) => ConcreteDataType::date_datatype(), + Value::Timestamp(v) => ConcreteDataType::timestamp_datatype(v.unit()), } } @@ -108,6 +111,7 @@ impl Value { Value::Date(v) => ValueRef::Date(*v), Value::DateTime(v) => ValueRef::DateTime(*v), Value::List(v) => ValueRef::List(ListValueRef::Ref(v)), + Value::Timestamp(v) => ValueRef::Timestamp(*v), } } } @@ -136,6 +140,7 @@ macro_rules! impl_ord_for_value_like { ($Type::Binary(v1), $Type::Binary(v2)) => v1.cmp(v2), ($Type::Date(v1), $Type::Date(v2)) => v1.cmp(v2), ($Type::DateTime(v1), $Type::DateTime(v2)) => v1.cmp(v2), + ($Type::Timestamp(v1), $Type::Timestamp(v2)) => v1.cmp(v2), ($Type::List(v1), $Type::List(v2)) => v1.cmp(v2), _ => panic!( "Cannot compare different values {:?} and {:?}", @@ -209,6 +214,12 @@ impl From> for Value { } } +impl From for Value { + fn from(v: Timestamp) -> Self { + Value::Timestamp(v) + } +} + impl From<&[u8]> for Value { fn from(bytes: &[u8]) -> Value { Value::Binary(bytes.into()) @@ -237,6 +248,7 @@ impl TryFrom for serde_json::Value { Value::Date(v) => serde_json::Value::Number(v.val().into()), Value::DateTime(v) => serde_json::Value::Number(v.val().into()), Value::List(v) => serde_json::to_value(v)?, + Value::Timestamp(v) => serde_json::to_value(v.value())?, }; Ok(json_value) @@ -311,7 +323,7 @@ pub enum ValueRef<'a> { // Date & Time types: Date(Date), DateTime(DateTime), - + Timestamp(Timestamp), List(ListValueRef<'a>), } @@ -363,6 +375,10 @@ impl<'a> ValueRef<'a> { impl_as_for_value_ref!(self, DateTime) } + pub fn as_timestamp(&self) -> Result> { + impl_as_for_value_ref!(self, Timestamp) + } + /// Cast itself to [ListValueRef]. pub fn as_list(&self) -> Result> { impl_as_for_value_ref!(self, List) @@ -605,6 +621,11 @@ mod tests { ConcreteDataType::binary_datatype(), Value::Binary(Bytes::from(b"world".as_slice())).data_type() ); + + assert_eq!( + ConcreteDataType::timestamp_millis_datatype(), + Value::Timestamp(Timestamp::from_millis(1)).data_type() + ); } #[test] @@ -696,6 +717,11 @@ mod tests { to_json(Value::DateTime(DateTime::new(5000))) ); + assert_eq!( + serde_json::Value::Number(1.into()), + to_json(Value::Timestamp(Timestamp::from_millis(1))) + ); + let json_value: serde_json::Value = serde_json::from_str(r#"{"items":[{"Int32":123}],"datatype":{"Int32":{}}}"#).unwrap(); assert_eq!( @@ -751,6 +777,7 @@ mod tests { check_as_value_ref!(Int64, -12); check_as_value_ref!(Float32, OrderedF32::from(16.0)); check_as_value_ref!(Float64, OrderedF64::from(16.0)); + check_as_value_ref!(Timestamp, Timestamp::from_millis(1)); assert_eq!( ValueRef::String("hello"), diff --git a/src/datatypes/src/vectors.rs b/src/datatypes/src/vectors.rs index b95fd39aae..1d9a45585e 100644 --- a/src/datatypes/src/vectors.rs +++ b/src/datatypes/src/vectors.rs @@ -11,6 +11,7 @@ pub mod mutable; pub mod null; pub mod primitive; mod string; +mod timestamp; use std::any::Any; use std::fmt::Debug; @@ -31,6 +32,7 @@ pub use null::*; pub use primitive::*; use snafu::ensure; pub use string::*; +pub use timestamp::*; use crate::data_type::ConcreteDataType; use crate::error::{self, Result}; diff --git a/src/datatypes/src/vectors/builder.rs b/src/datatypes/src/vectors/builder.rs index fea07e0a8a..ac3b1eb5ec 100644 --- a/src/datatypes/src/vectors/builder.rs +++ b/src/datatypes/src/vectors/builder.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use common_time::date::Date; use common_time::datetime::DateTime; +use common_time::timestamp::Timestamp; use crate::data_type::ConcreteDataType; use crate::scalars::ScalarVectorBuilder; @@ -11,8 +12,8 @@ use crate::vectors::datetime::DateTimeVectorBuilder; use crate::vectors::{ BinaryVectorBuilder, BooleanVectorBuilder, Float32VectorBuilder, Float64VectorBuilder, Int16VectorBuilder, Int32VectorBuilder, Int64VectorBuilder, Int8VectorBuilder, MutableVector, - NullVector, StringVectorBuilder, UInt16VectorBuilder, UInt32VectorBuilder, UInt64VectorBuilder, - UInt8VectorBuilder, VectorRef, + NullVector, StringVectorBuilder, TimestampVectorBuilder, UInt16VectorBuilder, + UInt32VectorBuilder, UInt64VectorBuilder, UInt8VectorBuilder, VectorRef, }; pub enum VectorBuilder { @@ -37,6 +38,7 @@ pub enum VectorBuilder { Date(DateVectorBuilder), DateTime(DateTimeVectorBuilder), + Timestamp(TimestampVectorBuilder), } impl VectorBuilder { @@ -92,6 +94,9 @@ impl VectorBuilder { ConcreteDataType::DateTime(_) => { VectorBuilder::DateTime(DateTimeVectorBuilder::with_capacity(capacity)) } + ConcreteDataType::Timestamp(_) => { + VectorBuilder::Timestamp(TimestampVectorBuilder::with_capacity(capacity)) + } _ => unimplemented!(), } } @@ -114,6 +119,7 @@ impl VectorBuilder { VectorBuilder::Binary(b) => b.data_type(), VectorBuilder::Date(b) => b.data_type(), VectorBuilder::DateTime(b) => b.data_type(), + VectorBuilder::Timestamp(b) => b.data_type(), } } @@ -141,6 +147,11 @@ impl VectorBuilder { (VectorBuilder::Date(b), Value::Int32(v)) => b.push(Some(Date::new(*v))), (VectorBuilder::DateTime(b), Value::DateTime(v)) => b.push(Some(*v)), (VectorBuilder::DateTime(b), Value::Int64(v)) => b.push(Some(DateTime::new(*v))), + (VectorBuilder::Timestamp(b), Value::Timestamp(t)) => b.push(Some(*t)), + (VectorBuilder::Timestamp(b), Value::Int64(v)) => { + b.push(Some(Timestamp::from_millis(*v))) + } + _ => panic!( "Value {:?} does not match builder type {:?}", value, @@ -167,6 +178,7 @@ impl VectorBuilder { VectorBuilder::Binary(b) => b.push(None), VectorBuilder::Date(b) => b.push(None), VectorBuilder::DateTime(b) => b.push(None), + VectorBuilder::Timestamp(b) => b.push(None), } } @@ -188,6 +200,7 @@ impl VectorBuilder { VectorBuilder::Binary(b) => Arc::new(b.finish()), VectorBuilder::Date(b) => Arc::new(b.finish()), VectorBuilder::DateTime(b) => Arc::new(b.finish()), + VectorBuilder::Timestamp(b) => Arc::new(b.finish()), } } } diff --git a/src/datatypes/src/vectors/helper.rs b/src/datatypes/src/vectors/helper.rs index d7e682a687..98a2bf042d 100644 --- a/src/datatypes/src/vectors/helper.rs +++ b/src/datatypes/src/vectors/helper.rs @@ -183,6 +183,9 @@ impl Helper { ArrowDataType::Date32 => Arc::new(DateVector::try_from_arrow_array(array)?), ArrowDataType::Date64 => Arc::new(DateTimeVector::try_from_arrow_array(array)?), ArrowDataType::List(_) => Arc::new(ListVector::try_from_arrow_array(array)?), + ArrowDataType::Timestamp(_, _) => { + Arc::new(TimestampVector::try_from_arrow_array(array)?) + } _ => unimplemented!("Arrow array datatype: {:?}", array.as_ref().data_type()), }) } diff --git a/src/datatypes/src/vectors/timestamp.rs b/src/datatypes/src/vectors/timestamp.rs new file mode 100644 index 0000000000..77bf8dea35 --- /dev/null +++ b/src/datatypes/src/vectors/timestamp.rs @@ -0,0 +1,287 @@ +use std::any::Any; +use std::sync::Arc; + +use arrow::array::{Array, ArrayRef, PrimitiveArray}; +use common_time::timestamp::{TimeUnit, Timestamp}; +use snafu::OptionExt; + +use crate::data_type::{ConcreteDataType, DataType}; +use crate::error; +use crate::error::Result; +use crate::prelude::{ + MutableVector, ScalarVector, ScalarVectorBuilder, Validity, Value, ValueRef, Vector, VectorRef, +}; +use crate::serialize::Serializable; +use crate::types::TimestampType; +use crate::vectors::{PrimitiveIter, PrimitiveVector, PrimitiveVectorBuilder}; + +/// `TimestampVector` stores timestamp in millisecond since UNIX Epoch. +#[derive(Debug, Clone, PartialEq)] +pub struct TimestampVector { + array: PrimitiveVector, +} + +impl TimestampVector { + pub fn new(array: PrimitiveArray) -> Self { + Self { + array: PrimitiveVector { array }, + } + } + + pub fn try_from_arrow_array(array: impl AsRef) -> Result { + Ok(Self::new( + array + .as_ref() + .as_any() + .downcast_ref::>() + .with_context(|| error::ConversionSnafu { + from: format!("{:?}", array.as_ref().data_type()), + })? + .clone(), + )) + } + + pub fn from_values>(iter: I) -> Self { + Self { + array: PrimitiveVector { + array: PrimitiveArray::from_values(iter), + }, + } + } +} + +impl Vector for TimestampVector { + fn data_type(&self) -> ConcreteDataType { + ConcreteDataType::timestamp_millis_datatype() + } + + fn vector_type_name(&self) -> String { + "TimestampVector".to_string() + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn len(&self) -> usize { + self.array.len() + } + + fn to_arrow_array(&self) -> ArrayRef { + let validity = self.array.array.validity().cloned(); + let buffer = self.array.array.values().clone(); + Arc::new(PrimitiveArray::new( + TimestampType::new(TimeUnit::Millisecond).as_arrow_type(), + buffer, + validity, + )) + } + + fn to_boxed_arrow_array(&self) -> Box { + let validity = self.array.array.validity().cloned(); + let values = self.array.array.values().clone(); + Box::new(PrimitiveArray::new( + arrow::datatypes::DataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None), + values, + validity, + )) + } + + fn validity(&self) -> Validity { + self.array.validity() + } + + fn memory_size(&self) -> usize { + self.array.memory_size() + } + + fn is_null(&self, row: usize) -> bool { + self.array.is_null(row) + } + + fn slice(&self, offset: usize, length: usize) -> VectorRef { + Arc::new(Self { + array: PrimitiveVector { + array: self.array.array.slice(offset, length), + }, + }) + } + + fn get(&self, index: usize) -> Value { + match self.array.get(index) { + Value::Null => Value::Null, + Value::Int64(v) => Value::Timestamp(Timestamp::from_millis(v)), + _ => { + unreachable!() + } + } + } + + fn replicate(&self, offsets: &[usize]) -> VectorRef { + self.array.replicate(offsets) + } + + fn get_ref(&self, index: usize) -> ValueRef { + match self.array.get(index) { + Value::Int64(v) => ValueRef::Timestamp(Timestamp::from_millis(v)), + Value::Null => ValueRef::Null, + _ => unreachable!(), + } + } +} + +impl Serializable for TimestampVector { + fn serialize_to_json(&self) -> Result> { + Ok(self + .array + .iter_data() + .map(|v| match v { + None => serde_json::Value::Null, + Some(v) => v.into(), + }) + .collect::>()) + } +} + +impl ScalarVector for TimestampVector { + type OwnedItem = Timestamp; + type RefItem<'a> = Timestamp; + type Iter<'a> = TimestampDataIter<'a>; + type Builder = TimestampVectorBuilder; + + fn get_data(&self, idx: usize) -> Option> { + self.array.get_data(idx).map(Timestamp::from_millis) + } + + fn iter_data(&self) -> Self::Iter<'_> { + TimestampDataIter { + iter: self.array.iter_data(), + } + } +} + +pub struct TimestampDataIter<'a> { + iter: PrimitiveIter<'a, i64>, +} + +impl<'a> Iterator for TimestampDataIter<'a> { + type Item = Option; + + fn next(&mut self) -> Option { + self.iter.next().map(|v| v.map(Timestamp::from_millis)) + } +} + +pub struct TimestampVectorBuilder { + buffer: PrimitiveVectorBuilder, +} + +impl MutableVector for TimestampVectorBuilder { + fn data_type(&self) -> ConcreteDataType { + ConcreteDataType::timestamp_millis_datatype() + } + + fn len(&self) -> usize { + self.buffer.len() + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn as_mut_any(&mut self) -> &mut dyn Any { + self + } + + fn to_vector(&mut self) -> VectorRef { + Arc::new(self.finish()) + } + + fn push_value_ref(&mut self, value: ValueRef) -> Result<()> { + // TODO(hl): vector and vector builder should also support customized time unit. + self.buffer.push( + value + .as_timestamp()? + .map(|t| t.convert_to(TimeUnit::Millisecond)), + ); + Ok(()) + } + + fn extend_slice_of(&mut self, vector: &dyn Vector, offset: usize, length: usize) -> Result<()> { + let concrete_vector = vector + .as_any() + .downcast_ref::() + .with_context(|| error::CastTypeSnafu { + msg: format!( + "Failed to convert vector from {} to DateVector", + vector.vector_type_name() + ), + })?; + + self.buffer + .extend_slice_of(&concrete_vector.array, offset, length)?; + Ok(()) + } +} + +impl ScalarVectorBuilder for TimestampVectorBuilder { + type VectorType = TimestampVector; + + fn with_capacity(capacity: usize) -> Self { + Self { + buffer: PrimitiveVectorBuilder::with_capacity(capacity), + } + } + + /// Pushes a Timestamp value into vector builder. The timestamp must be with time unit + /// `Second`/`MilliSecond`/`Microsecond`. + fn push(&mut self, value: Option<::RefItem<'_>>) { + self.buffer + .push(value.map(|v| v.convert_to(TimeUnit::Millisecond))); + } + + fn finish(&mut self) -> Self::VectorType { + Self::VectorType { + array: self.buffer.finish(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + pub fn test_build_timestamp_vector() { + let mut builder = TimestampVectorBuilder::with_capacity(3); + builder.push(Some(Timestamp::new(1, TimeUnit::Second))); + builder.push(None); + builder.push(Some(Timestamp::new(2, TimeUnit::Millisecond))); + + let vector = builder.finish(); + assert_eq!( + ConcreteDataType::timestamp_millis_datatype(), + vector.data_type() + ); + assert_eq!(3, vector.len()); + assert_eq!( + Value::Timestamp(Timestamp::new(1000, TimeUnit::Millisecond)), + vector.get(0) + ); + + assert_eq!(Value::Null, vector.get(1)); + assert_eq!( + Value::Timestamp(Timestamp::new(2, TimeUnit::Millisecond)), + vector.get(2) + ); + + assert_eq!( + vec![ + Some(Timestamp::new(1000, TimeUnit::Millisecond)), + None, + Some(Timestamp::new(2, TimeUnit::Millisecond)), + ], + vector.iter_data().collect::>() + ); + } +} diff --git a/src/script/src/python/vector.rs b/src/script/src/python/vector.rs index 1bcd21e392..ffa430acc8 100644 --- a/src/script/src/python/vector.rs +++ b/src/script/src/python/vector.rs @@ -15,6 +15,7 @@ use datatypes::arrow::{ }, }; use datatypes::data_type::ConcreteDataType; +use datatypes::prelude::Value; use datatypes::value::OrderedFloat; use datatypes::{ value, @@ -850,7 +851,9 @@ pub fn pyobj_try_to_typed_val( } } ConcreteDataType::List(_) => unreachable!(), - ConcreteDataType::Date(_) | ConcreteDataType::DateTime(_) => todo!(), + ConcreteDataType::Date(_) + | ConcreteDataType::DateTime(_) + | ConcreteDataType::Timestamp(_) => todo!(), } } else if is_instance::(&obj, vm) { // if Untyped then by default return types with highest precision @@ -906,6 +909,7 @@ pub fn val_to_pyobj(val: value::Value, vm: &VirtualMachine) -> PyObjectRef { // is `Date` and `DateTime` supported yet? For now just ad hoc into PyInt value::Value::Date(v) => vm.ctx.new_int(v.val()).into(), value::Value::DateTime(v) => vm.ctx.new_int(v.val()).into(), + Value::Timestamp(v) => vm.ctx.new_int(v.value()).into(), value::Value::List(_) => unreachable!(), } } diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index ffd72b51de..ef9761e6cb 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -12,6 +12,7 @@ common-error = { path = "../common/error" } common-recordbatch = { path = "../common/recordbatch" } common-runtime = { path = "../common/runtime" } common-telemetry = { path = "../common/telemetry" } +common-time = { path = "../common/time" } datatypes = { path = "../datatypes" } futures = "0.3" hyper = { version = "0.14", features = ["full"] } diff --git a/src/servers/src/mysql/writer.rs b/src/servers/src/mysql/writer.rs index 4e365286ac..deb4509258 100644 --- a/src/servers/src/mysql/writer.rs +++ b/src/servers/src/mysql/writer.rs @@ -103,6 +103,8 @@ impl<'a, W: io::Write> MysqlResultWriter<'a, W> { Value::Binary(v) => row_writer.write_col(v.deref())?, Value::Date(v) => row_writer.write_col(v.val())?, Value::DateTime(v) => row_writer.write_col(v.val())?, + Value::Timestamp(v) => row_writer + .write_col(v.convert_to(common_time::timestamp::TimeUnit::Second))?, // TODO(hl): Can we also write Value::List(_) => { return Err(Error::Internal { err_msg: format!( diff --git a/src/storage/proto/write_batch.proto b/src/storage/proto/write_batch.proto index 941f1ab790..c0730bad23 100644 --- a/src/storage/proto/write_batch.proto +++ b/src/storage/proto/write_batch.proto @@ -60,6 +60,7 @@ enum DataType { FLOAT64 = 11; STRING = 12; BINARY = 13; + TIMESTAMP = 14; } message Values { @@ -79,4 +80,5 @@ message Values { repeated bool bool_values = 11; repeated bytes binary_values = 12; repeated string string_values = 13; + repeated int64 timestamp_values = 14; } diff --git a/src/storage/src/memtable/tests.rs b/src/storage/src/memtable/tests.rs index cf98bd7d0d..9f4a9c742e 100644 --- a/src/storage/src/memtable/tests.rs +++ b/src/storage/src/memtable/tests.rs @@ -1,7 +1,9 @@ -use datatypes::arrow::array::{Int64Array, UInt64Array, UInt8Array}; +use common_time::timestamp::Timestamp; +use datatypes::arrow; +use datatypes::arrow::array::{Int64Array, PrimitiveArray, UInt64Array, UInt8Array}; use datatypes::prelude::*; use datatypes::type_id::LogicalTypeId; -use datatypes::vectors::{Int64VectorBuilder, UInt64VectorBuilder}; +use datatypes::vectors::{TimestampVectorBuilder, UInt64VectorBuilder}; use super::*; use crate::metadata::RegionMetadata; @@ -30,13 +32,13 @@ fn kvs_for_test_with_index( sequence: SequenceNumber, op_type: OpType, start_index_in_batch: usize, - keys: &[(i64, u64)], + keys: &[(Timestamp, u64)], values: &[(Option, Option)], ) -> KeyValues { assert_eq!(keys.len(), values.len()); let mut key_builders = ( - Int64VectorBuilder::with_capacity(keys.len()), + TimestampVectorBuilder::with_capacity(keys.len()), UInt64VectorBuilder::with_capacity(keys.len()), ); for key in keys { @@ -78,7 +80,7 @@ fn kvs_for_test_with_index( fn kvs_for_test( sequence: SequenceNumber, op_type: OpType, - keys: &[(i64, u64)], + keys: &[(Timestamp, u64)], values: &[(Option, Option)], ) -> KeyValues { kvs_for_test_with_index(sequence, op_type, 0, keys, values) @@ -91,7 +93,9 @@ pub fn write_kvs( keys: &[(i64, u64)], values: &[(Option, Option)], ) { - let kvs = kvs_for_test(sequence, op_type, keys, values); + let keys: Vec<(Timestamp, u64)> = keys.iter().map(|(l, r)| ((*l).into(), *r)).collect(); + + let kvs = kvs_for_test(sequence, op_type, &keys, values); memtable.write(&kvs).unwrap(); } @@ -111,6 +115,8 @@ fn check_iter_content( op_types: &[OpType], values: &[(Option, Option)], ) { + let keys: Vec<(Timestamp, u64)> = keys.iter().map(|(l, r)| ((*l).into(), *r)).collect(); + let mut index = 0; for batch in iter { let batch = batch.unwrap(); @@ -560,6 +566,12 @@ fn test_memtable_projection() { assert_eq!(5, batch.num_columns()); let k0 = Int64Array::from_slice(&[1000, 1001, 1002]); + let k0 = PrimitiveArray::new( + arrow::datatypes::DataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None), + k0.values().clone(), + k0.validity().cloned(), + ); + let k1 = UInt64Array::from_slice(&[0, 1, 2]); let v0 = UInt64Array::from_slice(&[10, 11, 12]); let sequences = UInt64Array::from_slice(&[9, 9, 9]); diff --git a/src/storage/src/proto/write_batch.rs b/src/storage/src/proto/write_batch.rs index 14e2c048a8..6942904be0 100644 --- a/src/storage/src/proto/write_batch.rs +++ b/src/storage/src/proto/write_batch.rs @@ -13,9 +13,10 @@ use datatypes::{ BinaryVector, BinaryVectorBuilder, BooleanVector, BooleanVectorBuilder, Float32Vector, Float32VectorBuilder, Float64Vector, Float64VectorBuilder, Int16Vector, Int16VectorBuilder, Int32Vector, Int32VectorBuilder, Int64Vector, Int64VectorBuilder, Int8Vector, - Int8VectorBuilder, StringVector, StringVectorBuilder, UInt16Vector, UInt16VectorBuilder, - UInt32Vector, UInt32VectorBuilder, UInt64Vector, UInt64VectorBuilder, UInt8Vector, - UInt8VectorBuilder, Vector, VectorRef, + Int8VectorBuilder, StringVector, StringVectorBuilder, TimestampVector, + TimestampVectorBuilder, UInt16Vector, UInt16VectorBuilder, UInt32Vector, + UInt32VectorBuilder, UInt64Vector, UInt64VectorBuilder, UInt8Vector, UInt8VectorBuilder, + Vector, VectorRef, }, }; use paste::paste; @@ -137,6 +138,7 @@ impl From<&ConcreteDataType> for DataType { ConcreteDataType::String(_) => DataType::String, ConcreteDataType::Null(_) => DataType::Null, ConcreteDataType::Binary(_) => DataType::Binary, + ConcreteDataType::Timestamp(_) => DataType::Timestamp, _ => unimplemented!(), // TODO(jiachun): Maybe support some composite types in the future , such as list, struct, etc. } } @@ -159,6 +161,7 @@ impl From for ConcreteDataType { DataType::String => ConcreteDataType::string_datatype(), DataType::Binary => ConcreteDataType::binary_datatype(), DataType::Null => ConcreteDataType::null_datatype(), + DataType::Timestamp => ConcreteDataType::timestamp_millis_datatype(), } } } @@ -221,6 +224,7 @@ gen_columns!(f64, Float64Vector, v, v); gen_columns!(bool, BooleanVector, v, v); gen_columns!(binary, BinaryVector, v, v.to_vec()); gen_columns!(string, StringVector, v, v.to_string()); +gen_columns!(timestamp, TimestampVector, v, v.value()); #[macro_export] macro_rules! gen_put_data { @@ -268,6 +272,7 @@ gen_put_data!(f64, Float64VectorBuilder, v, *v as f64); gen_put_data!(bool, BooleanVectorBuilder, v, *v); gen_put_data!(binary, BinaryVectorBuilder, v, v.as_slice()); gen_put_data!(string, StringVectorBuilder, v, v.as_str()); +gen_put_data!(timestamp, TimestampVectorBuilder, v, (*v).into()); pub fn gen_columns(vector: &VectorRef) -> Result { match vector.data_type() { @@ -284,6 +289,7 @@ pub fn gen_columns(vector: &VectorRef) -> Result { ConcreteDataType::Float64(_) => gen_columns_f64(vector), ConcreteDataType::Binary(_) => gen_columns_binary(vector), ConcreteDataType::String(_) => gen_columns_string(vector), + ConcreteDataType::Timestamp(_) => gen_columns_timestamp(vector), _ => { unimplemented!() // TODO(jiachun): Maybe support some composite types in the future, such as list, struct, etc. } @@ -305,6 +311,7 @@ pub fn gen_put_data_vector(data_type: ConcreteDataType, column: Column) -> Resul ConcreteDataType::Float64(_) => gen_put_data_f64(column), ConcreteDataType::Binary(_) => gen_put_data_binary(column), ConcreteDataType::String(_) => gen_put_data_string(column), + ConcreteDataType::Timestamp(_) => gen_put_data_timestamp(column), _ => unimplemented!(), // TODO(jiachun): Maybe support some composite types in the future, such as list, struct, etc. } } diff --git a/src/storage/src/region/tests.rs b/src/storage/src/region/tests.rs index 7f6350b6d8..4676b27abe 100644 --- a/src/storage/src/region/tests.rs +++ b/src/storage/src/region/tests.rs @@ -5,9 +5,10 @@ mod flush; mod projection; use common_telemetry::logging; +use common_time::timestamp::Timestamp; use datatypes::prelude::ScalarVector; use datatypes::type_id::LogicalTypeId; -use datatypes::vectors::Int64Vector; +use datatypes::vectors::{Int64Vector, TimestampVector}; use log_store::fs::{log::LocalFileLogStore, noop::NoopLogStore}; use object_store::{backend::fs, ObjectStore}; use store_api::storage::{ @@ -52,9 +53,11 @@ impl TesterBase { /// /// Format of data: (timestamp, v1), timestamp is key, v1 is value. pub async fn put(&self, data: &[(i64, Option)]) -> WriteResponse { + let data: Vec<(Timestamp, Option)> = + data.iter().map(|(l, r)| ((*l).into(), *r)).collect(); // Build a batch without version. let mut batch = new_write_batch_for_test(false); - let put_data = new_put_data(data); + let put_data = new_put_data(&data); batch.put(put_data).unwrap(); self.region.write(&self.write_ctx, batch).await.unwrap() @@ -63,7 +66,6 @@ impl TesterBase { /// Scan all data. pub async fn full_scan(&self) -> Vec<(i64, Option)> { logging::info!("Full scan with ctx {:?}", self.read_ctx); - let snapshot = self.region.snapshot(&self.read_ctx).unwrap(); let resp = snapshot @@ -94,7 +96,7 @@ fn new_write_batch_for_test(enable_version_column: bool) -> WriteBatch { if enable_version_column { write_batch_util::new_write_batch( &[ - (test_util::TIMESTAMP_NAME, LogicalTypeId::Int64, false), + (test_util::TIMESTAMP_NAME, LogicalTypeId::Timestamp, false), (consts::VERSION_COLUMN_NAME, LogicalTypeId::UInt64, false), ("v1", LogicalTypeId::Int64, true), ], @@ -103,7 +105,7 @@ fn new_write_batch_for_test(enable_version_column: bool) -> WriteBatch { } else { write_batch_util::new_write_batch( &[ - (test_util::TIMESTAMP_NAME, LogicalTypeId::Int64, false), + (test_util::TIMESTAMP_NAME, LogicalTypeId::Timestamp, false), ("v1", LogicalTypeId::Int64, true), ], Some(0), @@ -111,10 +113,10 @@ fn new_write_batch_for_test(enable_version_column: bool) -> WriteBatch { } } -fn new_put_data(data: &[(i64, Option)]) -> PutData { +fn new_put_data(data: &[(Timestamp, Option)]) -> PutData { let mut put_data = PutData::with_num_columns(2); - let timestamps = Int64Vector::from_values(data.iter().map(|kv| kv.0)); + let timestamps = TimestampVector::from_vec(data.iter().map(|v| v.0).collect()); let values = Int64Vector::from_iter(data.iter().map(|kv| kv.1)); put_data @@ -130,14 +132,14 @@ fn append_chunk_to(chunk: &Chunk, dst: &mut Vec<(i64, Option)>) { let timestamps = chunk.columns[0] .as_any() - .downcast_ref::() + .downcast_ref::() .unwrap(); let values = chunk.columns[1] .as_any() .downcast_ref::() .unwrap(); for (ts, value) in timestamps.iter_data().zip(values.iter_data()) { - dst.push((ts.unwrap(), value)); + dst.push((ts.unwrap().value(), value)); } } @@ -164,7 +166,7 @@ async fn test_new_region() { let expect_schema = schema_util::new_schema_ref( &[ ("k1", LogicalTypeId::Int32, false), - (test_util::TIMESTAMP_NAME, LogicalTypeId::Int64, false), + (test_util::TIMESTAMP_NAME, LogicalTypeId::Timestamp, false), (consts::VERSION_COLUMN_NAME, LogicalTypeId::UInt64, false), ("v1", LogicalTypeId::Float32, true), ], diff --git a/src/storage/src/region/tests/projection.rs b/src/storage/src/region/tests/projection.rs index b6b5486ff5..04a0918463 100644 --- a/src/storage/src/region/tests/projection.rs +++ b/src/storage/src/region/tests/projection.rs @@ -1,8 +1,9 @@ use std::sync::Arc; +use datatypes::data_type::ConcreteDataType; use datatypes::prelude::ScalarVector; use datatypes::type_id::LogicalTypeId; -use datatypes::vectors::Int64Vector; +use datatypes::vectors::{Int64Vector, TimestampVector}; use log_store::fs::log::LocalFileLogStore; use store_api::logstore::LogStore; use store_api::storage::{ @@ -26,7 +27,7 @@ fn new_write_batch_for_test() -> WriteBatch { write_batch_util::new_write_batch( &[ ("k0", LogicalTypeId::Int64, false), - (test_util::TIMESTAMP_NAME, LogicalTypeId::Int64, false), + (test_util::TIMESTAMP_NAME, LogicalTypeId::Timestamp, false), ("v0", LogicalTypeId::Int64, true), ("v1", LogicalTypeId::Int64, true), ], @@ -46,7 +47,7 @@ fn new_put_data(len: usize, key_start: i64, ts_start: i64, initial_value: i64) - let mut put_data = PutData::with_num_columns(4); let k0 = Int64Vector::from_values((0..len).map(|v| key_start + v as i64)); - let ts = Int64Vector::from_values((0..len).map(|v| ts_start + v as i64)); + let ts = TimestampVector::from_values((0..len).map(|v| ts_start + v as i64)); let v0 = Int64Vector::from_values(std::iter::repeat(initial_value).take(len)); let v1 = Int64Vector::from_values((0..len).map(|v| initial_value + v as i64)); @@ -68,13 +69,27 @@ fn append_chunk_to(chunk: &Chunk, dst: &mut Vec>) { dst.resize(num_rows, Vec::new()); for (i, row) in dst.iter_mut().enumerate() { for col in &chunk.columns { - let val = col - .as_any() - .downcast_ref::() - .unwrap() - .get_data(i) - .unwrap(); - row.push(val); + match col.data_type() { + ConcreteDataType::Int64(_) => { + let val = col + .as_any() + .downcast_ref::() + .unwrap() + .get_data(i) + .unwrap(); + row.push(val); + } + ConcreteDataType::Timestamp(_) => { + let val = col + .as_any() + .downcast_ref::() + .unwrap() + .get_data(i) + .unwrap(); + row.push(val.value()); + } + _ => unreachable!(), + } } } } diff --git a/src/storage/src/schema.rs b/src/storage/src/schema.rs index 1f152203a7..0e5ae4f50a 100644 --- a/src/storage/src/schema.rs +++ b/src/storage/src/schema.rs @@ -722,7 +722,7 @@ mod tests { let expect_schema = schema_util::new_schema_with_version( &[ ("k0", LogicalTypeId::Int64, false), - ("timestamp", LogicalTypeId::Int64, false), + ("timestamp", LogicalTypeId::Timestamp, false), ("v0", LogicalTypeId::Int64, true), ], Some(1), @@ -758,7 +758,7 @@ mod tests { let expect_schema = schema_util::new_schema_with_version( &[ ("k0", LogicalTypeId::Int64, false), - ("timestamp", LogicalTypeId::Int64, false), + ("timestamp", LogicalTypeId::Timestamp, false), ("v0", LogicalTypeId::Int64, true), (consts::SEQUENCE_COLUMN_NAME, LogicalTypeId::UInt64, false), (consts::OP_TYPE_COLUMN_NAME, LogicalTypeId::UInt8, false), @@ -839,7 +839,7 @@ mod tests { let expect_user = schema_util::new_schema_with_version( &[ ("v1", LogicalTypeId::Int64, true), - ("timestamp", LogicalTypeId::Int64, false), + ("timestamp", LogicalTypeId::Timestamp, false), ], Some(1), 123, diff --git a/src/storage/src/sst/parquet.rs b/src/storage/src/sst/parquet.rs index c9190769a6..3dcfce7291 100644 --- a/src/storage/src/sst/parquet.rs +++ b/src/storage/src/sst/parquet.rs @@ -263,8 +263,10 @@ impl BatchReader for ChunkStream { mod tests { use std::sync::Arc; - use datatypes::arrow::array::{Array, Int64Array, UInt64Array, UInt8Array}; + use datatypes::arrow::array::{Array, UInt64Array, UInt8Array}; use datatypes::arrow::io::parquet::read::FileReader; + use datatypes::prelude::{ScalarVector, Vector}; + use datatypes::vectors::TimestampVector; use object_store::backend::fs::Backend; use store_api::storage::OpType; use tempdir::TempDir; @@ -324,9 +326,15 @@ mod tests { // timestamp assert_eq!( - Arc::new(Int64Array::from_slice(&[ - 1000, 1000, 1001, 2002, 2003, 2003 - ])) as Arc, + TimestampVector::from_slice(&[ + 1000.into(), + 1000.into(), + 1001.into(), + 2002.into(), + 2003.into(), + 2003.into() + ]) + .to_arrow_array(), chunk.arrays()[0] ); diff --git a/src/storage/src/test_util/descriptor_util.rs b/src/storage/src/test_util/descriptor_util.rs index 99139e2cd8..8fc1a31a3c 100644 --- a/src/storage/src/test_util/descriptor_util.rs +++ b/src/storage/src/test_util/descriptor_util.rs @@ -22,7 +22,7 @@ impl RegionDescBuilder { ColumnDescriptorBuilder::new( 2, test_util::TIMESTAMP_NAME, - ConcreteDataType::int64_datatype(), + ConcreteDataType::timestamp_millis_datatype(), ) .is_nullable(false) .build() diff --git a/src/storage/src/write_batch.rs b/src/storage/src/write_batch.rs index e9f99db132..4ab2bc0299 100644 --- a/src/storage/src/write_batch.rs +++ b/src/storage/src/write_batch.rs @@ -7,12 +7,10 @@ use std::{ use common_error::prelude::*; use common_time::{RangeMillis, TimestampMillis}; +use datatypes::vectors::TimestampVector; use datatypes::{ - arrow::error::ArrowError, - data_type::ConcreteDataType, - prelude::ScalarVector, - schema::SchemaRef, - vectors::{Int64Vector, VectorRef}, + arrow::error::ArrowError, data_type::ConcreteDataType, prelude::ScalarVector, + schema::SchemaRef, vectors::VectorRef, }; use prost::{DecodeError, EncodeError}; use snafu::ensure; @@ -204,11 +202,10 @@ impl WriteRequest for WriteBatch { let column = put_data .column_by_name(ts_col_name) .unwrap_or_else(|| panic!("Cannot find column by name: {}", ts_col_name)); - - let ts_vector = column.as_any().downcast_ref::().unwrap(); // not expected to fail + let ts_vector = column.as_any().downcast_ref::().unwrap(); // not expected to fail for ts in ts_vector.iter_data().flatten() { - let aligned = align_timestamp(ts, durations_millis) - .context(TimestampOverflowSnafu { ts })?; + let aligned = align_timestamp(ts.value(), durations_millis) + .context(TimestampOverflowSnafu { ts: ts.value() })?; aligned_timestamps.insert(aligned); } } @@ -857,7 +854,7 @@ mod tests { &[ ("k1", LogicalTypeId::UInt64, false), (consts::VERSION_COLUMN_NAME, LogicalTypeId::UInt64, false), - ("ts", LogicalTypeId::Int64, false), + ("ts", LogicalTypeId::Timestamp, false), ("v1", LogicalTypeId::Boolean, true), ], Some(2), @@ -868,7 +865,7 @@ mod tests { fn test_write_batch_put() { let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3])); let boolv = Arc::new(BooleanVector::from(vec![true, false, true])); - let tsv = Arc::new(Int64Vector::from_vec(vec![0, 0, 0])); + let tsv = Arc::new(TimestampVector::from_vec(vec![0, 0, 0])); let mut put_data = PutData::new(); put_data.add_key_column("k1", intv.clone()).unwrap(); @@ -972,7 +969,7 @@ mod tests { #[test] fn test_put_unknown_column() { let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3])); - let tsv = Arc::new(Int64Vector::from_vec(vec![0, 0, 0])); + let tsv = Arc::new(TimestampVector::from_vec(vec![0, 0, 0])); let boolv = Arc::new(BooleanVector::from(vec![true, false, true])); let mut put_data = PutData::new(); @@ -1012,7 +1009,7 @@ mod tests { #[test] pub fn test_write_batch_time_range() { let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3, 4, 5, 6])); - let tsv = Arc::new(Int64Vector::from_vec(vec![-21, -20, -1, 0, 1, 20])); + let tsv = Arc::new(TimestampVector::from_vec(vec![-21, -20, -1, 0, 1, 20])); let boolv = Arc::new(BooleanVector::from(vec![ true, false, true, false, false, false, ])); @@ -1041,7 +1038,7 @@ mod tests { for i in 0..10 { let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3])); let boolv = Arc::new(BooleanVector::from(vec![Some(true), Some(false), None])); - let tsv = Arc::new(Int64Vector::from_vec(vec![i, i, i])); + let tsv = Arc::new(TimestampVector::from_vec(vec![i, i, i])); let mut put_data = PutData::new(); put_data.add_key_column("k1", intv.clone()).unwrap(); @@ -1095,7 +1092,7 @@ mod tests { let mut batch = new_test_batch(); for _ in 0..10 { let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3])); - let tsv = Arc::new(Int64Vector::from_vec(vec![0, 0, 0])); + let tsv = Arc::new(TimestampVector::from_vec(vec![0, 0, 0])); let mut put_data = PutData::new(); put_data.add_key_column("k1", intv.clone()).unwrap(); diff --git a/src/table-engine/Cargo.toml b/src/table-engine/Cargo.toml index 3d7b969db0..5469e76ce8 100644 --- a/src/table-engine/Cargo.toml +++ b/src/table-engine/Cargo.toml @@ -16,6 +16,7 @@ common-error = { path = "../common/error" } common-query = { path = "../common/query" } common-recordbatch = { path = "../common/recordbatch" } common-telemetry = { path = "../common/telemetry" } +common-time = { path = "../common/time" } datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" } datatypes = { path = "../datatypes" } futures = "0.3" diff --git a/src/table-engine/src/engine.rs b/src/table-engine/src/engine.rs index 9342cf85b0..2da9ad5721 100644 --- a/src/table-engine/src/engine.rs +++ b/src/table-engine/src/engine.rs @@ -419,7 +419,7 @@ mod tests { use common_recordbatch::util; use datafusion_common::field_util::FieldExt; use datafusion_common::field_util::SchemaExt; - use datatypes::prelude::ConcreteDataType; + use datatypes::prelude::{ConcreteDataType, ScalarVector}; use datatypes::schema::ColumnSchema; use datatypes::vectors::*; use store_api::manifest::Manifest; @@ -461,7 +461,7 @@ mod tests { let hosts = StringVector::from(vec!["host1", "host2"]); let cpus = Float64Vector::from_vec(vec![55.5, 66.6]); let memories = Float64Vector::from_vec(vec![1024f64, 4096f64]); - let tss = Int64Vector::from_vec(vec![1, 2]); + let tss = TimestampVector::from_vec(vec![1, 2]); columns_values.insert("host".to_string(), Arc::new(hosts.clone())); columns_values.insert("cpu".to_string(), Arc::new(cpus.clone())); @@ -541,7 +541,7 @@ mod tests { let hosts = StringVector::from(vec!["host1"; test_batch_size]); let cpus = Float64Vector::from_vec(vec![55.5; test_batch_size]); let memories = Float64Vector::from_vec(vec![1024f64; test_batch_size]); - let tss = Int64Vector::from_vec((0..test_batch_size).map(|v| v as i64).collect()); + let tss = TimestampVector::from_values((0..test_batch_size).map(|v| v as i64)); columns_values.insert("host".to_string(), Arc::new(hosts)); columns_values.insert("cpu".to_string(), Arc::new(cpus)); diff --git a/src/table-engine/src/table/test_util.rs b/src/table-engine/src/table/test_util.rs index 99b1195285..b49f077720 100644 --- a/src/table-engine/src/table/test_util.rs +++ b/src/table-engine/src/table/test_util.rs @@ -29,7 +29,11 @@ pub fn schema_for_test() -> Schema { ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true), ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true), - ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), true), + ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_datatype(common_time::timestamp::TimeUnit::Millisecond), + true, + ), ]; SchemaBuilder::from(column_schemas)