diff --git a/Cargo.lock b/Cargo.lock index 60ae23a969..90a30a51eb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1975,6 +1975,7 @@ dependencies = [ name = "common-time" version = "0.3.2" dependencies = [ + "arrow", "chrono", "chrono-tz 0.8.2", "common-error", @@ -4137,7 +4138,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/MichaelScofield/greptime-proto.git?rev=a98b81b660080e13f01b9cedc9778de9aa1c19b9#a98b81b660080e13f01b9cedc9778de9aa1c19b9" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=bec16e50c9322758111f73e42fb5d377c7235e05#bec16e50c9322758111f73e42fb5d377c7235e05" dependencies = [ "prost", "serde", diff --git a/Cargo.toml b/Cargo.toml index c500f1c61a..c0bda55ba5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -74,7 +74,7 @@ datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git etcd-client = "0.11" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/MichaelScofield/greptime-proto.git", rev = "a98b81b660080e13f01b9cedc9778de9aa1c19b9" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "bec16e50c9322758111f73e42fb5d377c7235e05" } itertools = "0.10" lazy_static = "1.4" parquet = "40.0" diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index d666decfcc..740355746d 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -15,7 +15,7 @@ use common_base::BitVec; use common_time::timestamp::TimeUnit; use datatypes::prelude::ConcreteDataType; -use datatypes::types::TimestampType; +use datatypes::types::{TimeType, TimestampType}; use datatypes::value::Value; use datatypes::vectors::VectorRef; use greptime_proto::v1::ddl_request::Expr; @@ -71,7 +71,10 @@ impl From for ConcreteDataType { ColumnDataType::TimestampNanosecond => { ConcreteDataType::timestamp_nanosecond_datatype() } - _ => unimplemented!("Implemented in #1961"), + ColumnDataType::TimeSecond => ConcreteDataType::time_second_datatype(), + ColumnDataType::TimeMillisecond => ConcreteDataType::time_millisecond_datatype(), + ColumnDataType::TimeMicrosecond => ConcreteDataType::time_microsecond_datatype(), + ColumnDataType::TimeNanosecond => ConcreteDataType::time_nanosecond_datatype(), } } } @@ -96,12 +99,18 @@ impl TryFrom for ColumnDataTypeWrapper { ConcreteDataType::String(_) => ColumnDataType::String, ConcreteDataType::Date(_) => ColumnDataType::Date, ConcreteDataType::DateTime(_) => ColumnDataType::Datetime, - ConcreteDataType::Timestamp(unit) => match unit { + ConcreteDataType::Timestamp(t) => match t { TimestampType::Second(_) => ColumnDataType::TimestampSecond, TimestampType::Millisecond(_) => ColumnDataType::TimestampMillisecond, TimestampType::Microsecond(_) => ColumnDataType::TimestampMicrosecond, TimestampType::Nanosecond(_) => ColumnDataType::TimestampNanosecond, }, + ConcreteDataType::Time(t) => match t { + TimeType::Second(_) => ColumnDataType::TimeSecond, + TimeType::Millisecond(_) => ColumnDataType::TimeMillisecond, + TimeType::Microsecond(_) => ColumnDataType::TimeMicrosecond, + TimeType::Nanosecond(_) => ColumnDataType::TimeNanosecond, + }, ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) => { @@ -190,7 +199,22 @@ pub fn values_with_capacity(datatype: ColumnDataType, capacity: usize) -> Values ts_nanosecond_values: Vec::with_capacity(capacity), ..Default::default() }, - _ => unimplemented!("Implemented in #1961"), + ColumnDataType::TimeSecond => Values { + time_second_values: Vec::with_capacity(capacity), + ..Default::default() + }, + ColumnDataType::TimeMillisecond => Values { + time_millisecond_values: Vec::with_capacity(capacity), + ..Default::default() + }, + ColumnDataType::TimeMicrosecond => Values { + time_microsecond_values: Vec::with_capacity(capacity), + ..Default::default() + }, + ColumnDataType::TimeNanosecond => Values { + time_nanosecond_values: Vec::with_capacity(capacity), + ..Default::default() + }, } } @@ -225,6 +249,12 @@ pub fn push_vals(column: &mut Column, origin_count: usize, vector: VectorRef) { TimeUnit::Microsecond => values.ts_microsecond_values.push(val.value()), TimeUnit::Nanosecond => values.ts_nanosecond_values.push(val.value()), }, + Value::Time(val) => match val.unit() { + TimeUnit::Second => values.time_second_values.push(val.value()), + TimeUnit::Millisecond => values.time_millisecond_values.push(val.value()), + TimeUnit::Microsecond => values.time_microsecond_values.push(val.value()), + TimeUnit::Nanosecond => values.time_nanosecond_values.push(val.value()), + }, Value::List(_) => unreachable!(), }); column.null_mask = null_mask.into_vec(); @@ -268,7 +298,8 @@ mod tests { use std::sync::Arc; use datatypes::vectors::{ - BooleanVector, TimestampMicrosecondVector, TimestampMillisecondVector, + BooleanVector, TimeMicrosecondVector, TimeMillisecondVector, TimeNanosecondVector, + TimeSecondVector, TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector, TimestampSecondVector, }; @@ -331,6 +362,10 @@ mod tests { let values = values_with_capacity(ColumnDataType::TimestampMillisecond, 2); let values = values.ts_millisecond_values; assert_eq!(2, values.capacity()); + + let values = values_with_capacity(ColumnDataType::TimeMillisecond, 2); + let values = values.time_millisecond_values; + assert_eq!(2, values.capacity()); } #[test] @@ -399,6 +434,10 @@ mod tests { ConcreteDataType::timestamp_millisecond_datatype(), ColumnDataTypeWrapper(ColumnDataType::TimestampMillisecond).into() ); + assert_eq!( + ConcreteDataType::time_datatype(TimeUnit::Millisecond), + ColumnDataTypeWrapper(ColumnDataType::TimeMillisecond).into() + ); } #[test] @@ -527,6 +566,47 @@ mod tests { ); } + #[test] + fn test_column_put_time_values() { + let mut column = Column { + column_name: "test".to_string(), + semantic_type: 0, + values: Some(Values { + ..Default::default() + }), + null_mask: vec![], + datatype: 0, + }; + + let vector = Arc::new(TimeNanosecondVector::from_vec(vec![1, 2, 3])); + push_vals(&mut column, 3, vector); + assert_eq!( + vec![1, 2, 3], + column.values.as_ref().unwrap().time_nanosecond_values + ); + + let vector = Arc::new(TimeMillisecondVector::from_vec(vec![4, 5, 6])); + push_vals(&mut column, 3, vector); + assert_eq!( + vec![4, 5, 6], + column.values.as_ref().unwrap().time_millisecond_values + ); + + let vector = Arc::new(TimeMicrosecondVector::from_vec(vec![7, 8, 9])); + push_vals(&mut column, 3, vector); + assert_eq!( + vec![7, 8, 9], + column.values.as_ref().unwrap().time_microsecond_values + ); + + let vector = Arc::new(TimeSecondVector::from_vec(vec![10, 11, 12])); + push_vals(&mut column, 3, vector); + assert_eq!( + vec![10, 11, 12], + column.values.as_ref().unwrap().time_second_values + ); + } + #[test] fn test_column_put_vector() { use crate::v1::column::SemanticType; diff --git a/src/common/grpc-expr/src/insert.rs b/src/common/grpc-expr/src/insert.rs index 6b6e9e38be..b220e2e6f4 100644 --- a/src/common/grpc-expr/src/insert.rs +++ b/src/common/grpc-expr/src/insert.rs @@ -22,17 +22,19 @@ use api::v1::{ InsertRequest as GrpcInsertRequest, }; use common_base::BitVec; +use common_time::time::Time; use common_time::timestamp::Timestamp; use common_time::{Date, DateTime}; use datatypes::data_type::{ConcreteDataType, DataType}; use datatypes::prelude::{ValueRef, VectorRef}; use datatypes::scalars::ScalarVector; use datatypes::schema::SchemaRef; -use datatypes::types::{Int16Type, Int8Type, TimestampType, UInt16Type, UInt8Type}; +use datatypes::types::{Int16Type, Int8Type, TimeType, TimestampType, UInt16Type, UInt8Type}; use datatypes::value::Value; use datatypes::vectors::{ BinaryVector, BooleanVector, DateTimeVector, DateVector, Float32Vector, Float64Vector, - Int32Vector, Int64Vector, PrimitiveVector, StringVector, TimestampMicrosecondVector, + Int32Vector, Int64Vector, PrimitiveVector, StringVector, TimeMicrosecondVector, + TimeMillisecondVector, TimeNanosecondVector, TimeSecondVector, TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector, TimestampSecondVector, UInt32Vector, UInt64Vector, }; @@ -194,7 +196,26 @@ fn collect_column_values(column_datatype: ColumnDataType, values: &Values) -> Ve Timestamp::new_nanosecond(*v) )) } - _ => unimplemented!("Implemented in #1961"), + ColumnDataType::TimeSecond => { + collect_values!(values.time_second_values, |v| ValueRef::Time( + Time::new_second(*v) + )) + } + ColumnDataType::TimeMillisecond => { + collect_values!(values.time_millisecond_values, |v| ValueRef::Time( + Time::new_millisecond(*v) + )) + } + ColumnDataType::TimeMicrosecond => { + collect_values!(values.time_millisecond_values, |v| ValueRef::Time( + Time::new_microsecond(*v) + )) + } + ColumnDataType::TimeNanosecond => { + collect_values!(values.time_millisecond_values, |v| ValueRef::Time( + Time::new_nanosecond(*v) + )) + } } } @@ -388,6 +409,21 @@ fn values_to_vector(data_type: &ConcreteDataType, values: Values) -> VectorRef { values.ts_nanosecond_values, )), }, + ConcreteDataType::Time(unit) => match unit { + TimeType::Second(_) => Arc::new(TimeSecondVector::from_iter_values( + values.time_second_values.iter().map(|x| *x as i32), + )), + TimeType::Millisecond(_) => Arc::new(TimeMillisecondVector::from_iter_values( + values.time_millisecond_values.iter().map(|x| *x as i32), + )), + TimeType::Microsecond(_) => Arc::new(TimeMicrosecondVector::from_vec( + values.time_microsecond_values, + )), + TimeType::Nanosecond(_) => Arc::new(TimeNanosecondVector::from_vec( + values.time_nanosecond_values, + )), + }, + ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) => { unreachable!() } @@ -496,6 +532,27 @@ fn convert_values(data_type: &ConcreteDataType, values: Values) -> Vec { .into_iter() .map(|v| Value::Timestamp(Timestamp::new_nanosecond(v))) .collect(), + ConcreteDataType::Time(TimeType::Second(_)) => values + .time_second_values + .into_iter() + .map(|v| Value::Time(Time::new_second(v))) + .collect(), + ConcreteDataType::Time(TimeType::Millisecond(_)) => values + .time_millisecond_values + .into_iter() + .map(|v| Value::Time(Time::new_millisecond(v))) + .collect(), + ConcreteDataType::Time(TimeType::Microsecond(_)) => values + .time_microsecond_values + .into_iter() + .map(|v| Value::Time(Time::new_microsecond(v))) + .collect(), + ConcreteDataType::Time(TimeType::Nanosecond(_)) => values + .time_nanosecond_values + .into_iter() + .map(|v| Value::Time(Time::new_nanosecond(v))) + .collect(), + ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) => { unreachable!() } @@ -516,10 +573,13 @@ mod tests { use api::v1::{Column, ColumnDataType}; use common_base::BitVec; use common_catalog::consts::MITO_ENGINE; - use common_time::timestamp::Timestamp; + use common_time::timestamp::{TimeUnit, Timestamp}; use datatypes::data_type::ConcreteDataType; use datatypes::schema::{ColumnSchema, SchemaBuilder}; - use datatypes::types::{TimestampMillisecondType, TimestampSecondType, TimestampType}; + use datatypes::types::{ + TimeMillisecondType, TimeSecondType, TimeType, TimestampMillisecondType, + TimestampSecondType, TimestampType, + }; use datatypes::value::Value; use paste::paste; use snafu::ResultExt; @@ -576,8 +636,8 @@ mod tests { ); let column_defs = create_expr.column_defs; - assert_eq!(column_defs[3].name, create_expr.time_index); - assert_eq!(4, column_defs.len()); + assert_eq!(column_defs[4].name, create_expr.time_index); + assert_eq!(5, column_defs.len()); assert_eq!( ConcreteDataType::string_datatype(), @@ -621,6 +681,20 @@ mod tests { ) ); + assert_eq!( + ConcreteDataType::time_datatype(TimeUnit::Millisecond), + ConcreteDataType::from( + ColumnDataTypeWrapper::try_new( + column_defs + .iter() + .find(|c| c.name == "time") + .unwrap() + .datatype + ) + .unwrap() + ) + ); + assert_eq!( ConcreteDataType::timestamp_millisecond_datatype(), ConcreteDataType::from( @@ -654,7 +728,7 @@ mod tests { let add_columns = find_new_columns(&schema, &insert_batch.0).unwrap().unwrap(); - assert_eq!(2, add_columns.add_columns.len()); + assert_eq!(3, add_columns.add_columns.len()); let host_column = &add_columns.add_columns[0]; assert!(host_column.is_key); @@ -676,6 +750,17 @@ mod tests { .unwrap() ) ); + + let time_column = &add_columns.add_columns[2]; + assert!(!time_column.is_key); + + assert_eq!( + ConcreteDataType::time_datatype(TimeUnit::Millisecond), + ConcreteDataType::from( + ColumnDataTypeWrapper::try_new(time_column.column_def.as_ref().unwrap().datatype) + .unwrap() + ) + ); } #[test] @@ -887,6 +972,39 @@ mod tests { assert_eq!(expect, actual); } + #[test] + fn test_convert_time_values() { + // second + let actual = convert_values( + &ConcreteDataType::Time(TimeType::Second(TimeSecondType)), + Values { + time_second_values: vec![1_i64, 2_i64, 3_i64], + ..Default::default() + }, + ); + let expect = vec![ + Value::Time(Time::new_second(1_i64)), + Value::Time(Time::new_second(2_i64)), + Value::Time(Time::new_second(3_i64)), + ]; + assert_eq!(expect, actual); + + // millisecond + let actual = convert_values( + &ConcreteDataType::Time(TimeType::Millisecond(TimeMillisecondType)), + Values { + time_millisecond_values: vec![1_i64, 2_i64, 3_i64], + ..Default::default() + }, + ); + let expect = vec![ + Value::Time(Time::new_millisecond(1_i64)), + Value::Time(Time::new_millisecond(2_i64)), + Value::Time(Time::new_millisecond(3_i64)), + ]; + assert_eq!(expect, actual); + } + #[test] fn test_is_null() { let null_mask = BitVec::from_slice(&[0b0000_0001, 0b0000_1000]); @@ -940,6 +1058,18 @@ mod tests { datatype: ColumnDataType::Float64 as i32, }; + let time_vals = column::Values { + time_millisecond_values: vec![100, 101], + ..Default::default() + }; + let time_column = Column { + column_name: "time".to_string(), + semantic_type: SemanticType::Field as i32, + values: Some(time_vals), + null_mask: vec![0], + datatype: ColumnDataType::TimeMillisecond as i32, + }; + let ts_vals = column::Values { ts_millisecond_values: vec![100, 101], ..Default::default() @@ -953,7 +1083,7 @@ mod tests { }; ( - vec![host_column, cpu_column, mem_column, ts_column], + vec![host_column, cpu_column, mem_column, time_column, ts_column], row_count, ) } diff --git a/src/common/grpc/src/select.rs b/src/common/grpc/src/select.rs index 55e15adf77..21eed06c0c 100644 --- a/src/common/grpc/src/select.rs +++ b/src/common/grpc/src/select.rs @@ -14,10 +14,11 @@ use api::v1::column::Values; use common_base::BitVec; -use datatypes::types::{TimestampType, WrapperType}; +use datatypes::types::{TimeType, TimestampType, WrapperType}; use datatypes::vectors::{ BinaryVector, BooleanVector, DateTimeVector, DateVector, Float32Vector, Float64Vector, - Int16Vector, Int32Vector, Int64Vector, Int8Vector, StringVector, TimestampMicrosecondVector, + Int16Vector, Int32Vector, Int64Vector, Int8Vector, StringVector, TimeMicrosecondVector, + TimeMillisecondVector, TimeNanosecondVector, TimeSecondVector, TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector, TimestampSecondVector, UInt16Vector, UInt32Vector, UInt64Vector, UInt8Vector, VectorRef, }; @@ -167,6 +168,30 @@ pub fn values(arrays: &[VectorRef]) -> Result { TimestampNanosecondVector, ts_nanosecond_values, |x| { x.into_native() } + ), + ( + ConcreteDataType::Time(TimeType::Second(_)), + TimeSecondVector, + time_second_values, + |x| { x.into_native() as i64 } + ), + ( + ConcreteDataType::Time(TimeType::Millisecond(_)), + TimeMillisecondVector, + time_millisecond_values, + |x| { x.into_native() as i64 } + ), + ( + ConcreteDataType::Time(TimeType::Microsecond(_)), + TimeMicrosecondVector, + time_microsecond_values, + |x| { x.into_native() } + ), + ( + ConcreteDataType::Time(TimeType::Nanosecond(_)), + TimeNanosecondVector, + time_nanosecond_values, + |x| { x.into_native() } ) ) } @@ -187,6 +212,16 @@ mod tests { assert_eq!(vec![1, 2, 3], values.i32_values); } + #[test] + fn test_convert_arrow_array_time_second() { + let array = TimeSecondVector::from(vec![Some(1), Some(2), None, Some(3)]); + let array: VectorRef = Arc::new(array); + + let values = values(&[array]).unwrap(); + + assert_eq!(vec![1, 2, 3], values.time_second_values); + } + #[test] fn test_convert_arrow_arrays_string() { let array = StringVector::from(vec![ diff --git a/src/common/time/Cargo.toml b/src/common/time/Cargo.toml index c8c93bec1d..e075622375 100644 --- a/src/common/time/Cargo.toml +++ b/src/common/time/Cargo.toml @@ -5,6 +5,7 @@ edition.workspace = true license.workspace = true [dependencies] +arrow.workspace = true chrono.workspace = true chrono-tz = "0.8" common-error = { path = "../error" } diff --git a/src/common/time/src/lib.rs b/src/common/time/src/lib.rs index 76558e7610..1328d08a23 100644 --- a/src/common/time/src/lib.rs +++ b/src/common/time/src/lib.rs @@ -16,6 +16,7 @@ pub mod date; pub mod datetime; pub mod error; pub mod range; +pub mod time; pub mod timestamp; pub mod timestamp_millis; pub mod timezone; diff --git a/src/common/time/src/time.rs b/src/common/time/src/time.rs new file mode 100644 index 0000000000..4ee3278c16 --- /dev/null +++ b/src/common/time/src/time.rs @@ -0,0 +1,412 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cmp::Ordering; +use std::hash::{Hash, Hasher}; + +use chrono::offset::Local; +use chrono::{NaiveDateTime, NaiveTime, TimeZone as ChronoTimeZone, Utc}; +use serde::{Deserialize, Serialize}; + +use crate::timestamp::TimeUnit; +use crate::timezone::TimeZone; + +/// Time value, represents the elapsed time since midnight in the unit of `TimeUnit`. +#[derive(Debug, Clone, Default, Copy, Serialize, Deserialize)] +pub struct Time { + value: i64, + unit: TimeUnit, +} + +impl Time { + /// Creates the time by value and `TimeUnit`. + pub fn new(value: i64, unit: TimeUnit) -> Self { + Self { value, unit } + } + + /// Creates the time in nanosecond. + pub fn new_nanosecond(value: i64) -> Self { + Self { + value, + unit: TimeUnit::Nanosecond, + } + } + + /// Creates the time in second. + pub fn new_second(value: i64) -> Self { + Self { + value, + unit: TimeUnit::Second, + } + } + + /// Creates the time in millisecond. + pub fn new_millisecond(value: i64) -> Self { + Self { + value, + unit: TimeUnit::Millisecond, + } + } + + /// Creates the time in microsecond. + pub fn new_microsecond(value: i64) -> Self { + Self { + value, + unit: TimeUnit::Microsecond, + } + } + + /// Returns the `TimeUnit` of the time. + pub fn unit(&self) -> &TimeUnit { + &self.unit + } + + /// Returns the value of the time. + pub fn value(&self) -> i64 { + self.value + } + + /// Split a [Time] into seconds part and nanoseconds part. + /// Notice the seconds part of split result is always rounded down to floor. + fn split(&self) -> (i64, u32) { + let sec_mul = (TimeUnit::Second.factor() / self.unit.factor()) as i64; + let nsec_mul = (self.unit.factor() / TimeUnit::Nanosecond.factor()) as i64; + + let sec_div = self.value.div_euclid(sec_mul); + let sec_mod = self.value.rem_euclid(sec_mul); + // safety: the max possible value of `sec_mod` is 999,999,999 + let nsec = u32::try_from(sec_mod * nsec_mul).unwrap(); + (sec_div, nsec) + } + + /// Format Time to ISO8601 string. If the time exceeds what chrono time can + /// represent, this function simply print the time unit and value in plain string. + pub fn to_iso8601_string(&self) -> String { + self.as_formatted_string("%H:%M:%S%.f%z", None) + } + + /// Format Time for local timezone. + pub fn to_local_string(&self) -> String { + self.as_formatted_string("%H:%M:%S%.f", None) + } + + /// Format Time for given timezone. + /// When timezone is None, using local time by default. + pub fn to_timezone_aware_string(&self, tz: Option) -> String { + self.as_formatted_string("%H:%M:%S%.f", tz) + } + + fn as_formatted_string(self, pattern: &str, timezone: Option) -> String { + if let Some(time) = self.to_chrono_time() { + let date = Utc::now().date_naive(); + let datetime = NaiveDateTime::new(date, time); + + match timezone { + Some(TimeZone::Offset(offset)) => { + format!("{}", offset.from_utc_datetime(&datetime).format(pattern)) + } + Some(TimeZone::Named(tz)) => { + format!("{}", tz.from_utc_datetime(&datetime).format(pattern)) + } + None => { + let local = Local {}; + format!("{}", local.from_utc_datetime(&datetime).format(pattern)) + } + } + } else { + format!("[Time{}: {}]", self.unit, self.value) + } + } + + /// Cast the [Time] into chrono NaiveDateTime + pub fn to_chrono_time(&self) -> Option { + let (sec, nsec) = self.split(); + if let Ok(sec) = u32::try_from(sec) { + NaiveTime::from_num_seconds_from_midnight_opt(sec, nsec) + } else { + None + } + } +} + +impl From for Time { + fn from(v: i64) -> Self { + Self { + value: v, + unit: TimeUnit::Millisecond, + } + } +} + +impl From