feat: impl time type (#1961)

* chore: remove useless Option type in plugins (#1544)

Co-authored-by: paomian <qtang@greptime.com>

* chore: remove useless Option type in plugins (#1544)

Co-authored-by: paomian <qtang@greptime.com>

* chore: remove useless Option type in plugins (#1544)

Co-authored-by: paomian <qtang@greptime.com>

* chore: remove useless Option type in plugins (#1544)

Co-authored-by: paomian <qtang@greptime.com>

* feat: first commit for time type

* feat: impl time type

* fix: arrow vectors type conversion

* test: add time test

* test: adds more tests for time type

* chore: style

* fix: sqlness result

* Update src/common/time/src/time.rs

Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>

* chore: CR comments

---------

Co-authored-by: localhost <xpaomian@gmail.com>
Co-authored-by: paomian <qtang@greptime.com>
Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>
This commit is contained in:
dennis zhuang
2023-07-18 10:55:28 +08:00
committed by GitHub
parent 6811acb314
commit b81570b99a
29 changed files with 1564 additions and 45 deletions

3
Cargo.lock generated
View File

@@ -1975,6 +1975,7 @@ dependencies = [
name = "common-time"
version = "0.3.2"
dependencies = [
"arrow",
"chrono",
"chrono-tz 0.8.2",
"common-error",
@@ -4137,7 +4138,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/MichaelScofield/greptime-proto.git?rev=a98b81b660080e13f01b9cedc9778de9aa1c19b9#a98b81b660080e13f01b9cedc9778de9aa1c19b9"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=bec16e50c9322758111f73e42fb5d377c7235e05#bec16e50c9322758111f73e42fb5d377c7235e05"
dependencies = [
"prost",
"serde",

View File

@@ -74,7 +74,7 @@ datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git
etcd-client = "0.11"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/MichaelScofield/greptime-proto.git", rev = "a98b81b660080e13f01b9cedc9778de9aa1c19b9" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "bec16e50c9322758111f73e42fb5d377c7235e05" }
itertools = "0.10"
lazy_static = "1.4"
parquet = "40.0"

View File

@@ -15,7 +15,7 @@
use common_base::BitVec;
use common_time::timestamp::TimeUnit;
use datatypes::prelude::ConcreteDataType;
use datatypes::types::TimestampType;
use datatypes::types::{TimeType, TimestampType};
use datatypes::value::Value;
use datatypes::vectors::VectorRef;
use greptime_proto::v1::ddl_request::Expr;
@@ -71,7 +71,10 @@ impl From<ColumnDataTypeWrapper> for ConcreteDataType {
ColumnDataType::TimestampNanosecond => {
ConcreteDataType::timestamp_nanosecond_datatype()
}
_ => unimplemented!("Implemented in #1961"),
ColumnDataType::TimeSecond => ConcreteDataType::time_second_datatype(),
ColumnDataType::TimeMillisecond => ConcreteDataType::time_millisecond_datatype(),
ColumnDataType::TimeMicrosecond => ConcreteDataType::time_microsecond_datatype(),
ColumnDataType::TimeNanosecond => ConcreteDataType::time_nanosecond_datatype(),
}
}
}
@@ -96,12 +99,18 @@ impl TryFrom<ConcreteDataType> for ColumnDataTypeWrapper {
ConcreteDataType::String(_) => ColumnDataType::String,
ConcreteDataType::Date(_) => ColumnDataType::Date,
ConcreteDataType::DateTime(_) => ColumnDataType::Datetime,
ConcreteDataType::Timestamp(unit) => match unit {
ConcreteDataType::Timestamp(t) => match t {
TimestampType::Second(_) => ColumnDataType::TimestampSecond,
TimestampType::Millisecond(_) => ColumnDataType::TimestampMillisecond,
TimestampType::Microsecond(_) => ColumnDataType::TimestampMicrosecond,
TimestampType::Nanosecond(_) => ColumnDataType::TimestampNanosecond,
},
ConcreteDataType::Time(t) => match t {
TimeType::Second(_) => ColumnDataType::TimeSecond,
TimeType::Millisecond(_) => ColumnDataType::TimeMillisecond,
TimeType::Microsecond(_) => ColumnDataType::TimeMicrosecond,
TimeType::Nanosecond(_) => ColumnDataType::TimeNanosecond,
},
ConcreteDataType::Null(_)
| ConcreteDataType::List(_)
| ConcreteDataType::Dictionary(_) => {
@@ -190,7 +199,22 @@ pub fn values_with_capacity(datatype: ColumnDataType, capacity: usize) -> Values
ts_nanosecond_values: Vec::with_capacity(capacity),
..Default::default()
},
_ => unimplemented!("Implemented in #1961"),
ColumnDataType::TimeSecond => Values {
time_second_values: Vec::with_capacity(capacity),
..Default::default()
},
ColumnDataType::TimeMillisecond => Values {
time_millisecond_values: Vec::with_capacity(capacity),
..Default::default()
},
ColumnDataType::TimeMicrosecond => Values {
time_microsecond_values: Vec::with_capacity(capacity),
..Default::default()
},
ColumnDataType::TimeNanosecond => Values {
time_nanosecond_values: Vec::with_capacity(capacity),
..Default::default()
},
}
}
@@ -225,6 +249,12 @@ pub fn push_vals(column: &mut Column, origin_count: usize, vector: VectorRef) {
TimeUnit::Microsecond => values.ts_microsecond_values.push(val.value()),
TimeUnit::Nanosecond => values.ts_nanosecond_values.push(val.value()),
},
Value::Time(val) => match val.unit() {
TimeUnit::Second => values.time_second_values.push(val.value()),
TimeUnit::Millisecond => values.time_millisecond_values.push(val.value()),
TimeUnit::Microsecond => values.time_microsecond_values.push(val.value()),
TimeUnit::Nanosecond => values.time_nanosecond_values.push(val.value()),
},
Value::List(_) => unreachable!(),
});
column.null_mask = null_mask.into_vec();
@@ -268,7 +298,8 @@ mod tests {
use std::sync::Arc;
use datatypes::vectors::{
BooleanVector, TimestampMicrosecondVector, TimestampMillisecondVector,
BooleanVector, TimeMicrosecondVector, TimeMillisecondVector, TimeNanosecondVector,
TimeSecondVector, TimestampMicrosecondVector, TimestampMillisecondVector,
TimestampNanosecondVector, TimestampSecondVector,
};
@@ -331,6 +362,10 @@ mod tests {
let values = values_with_capacity(ColumnDataType::TimestampMillisecond, 2);
let values = values.ts_millisecond_values;
assert_eq!(2, values.capacity());
let values = values_with_capacity(ColumnDataType::TimeMillisecond, 2);
let values = values.time_millisecond_values;
assert_eq!(2, values.capacity());
}
#[test]
@@ -399,6 +434,10 @@ mod tests {
ConcreteDataType::timestamp_millisecond_datatype(),
ColumnDataTypeWrapper(ColumnDataType::TimestampMillisecond).into()
);
assert_eq!(
ConcreteDataType::time_datatype(TimeUnit::Millisecond),
ColumnDataTypeWrapper(ColumnDataType::TimeMillisecond).into()
);
}
#[test]
@@ -527,6 +566,47 @@ mod tests {
);
}
#[test]
fn test_column_put_time_values() {
let mut column = Column {
column_name: "test".to_string(),
semantic_type: 0,
values: Some(Values {
..Default::default()
}),
null_mask: vec![],
datatype: 0,
};
let vector = Arc::new(TimeNanosecondVector::from_vec(vec![1, 2, 3]));
push_vals(&mut column, 3, vector);
assert_eq!(
vec![1, 2, 3],
column.values.as_ref().unwrap().time_nanosecond_values
);
let vector = Arc::new(TimeMillisecondVector::from_vec(vec![4, 5, 6]));
push_vals(&mut column, 3, vector);
assert_eq!(
vec![4, 5, 6],
column.values.as_ref().unwrap().time_millisecond_values
);
let vector = Arc::new(TimeMicrosecondVector::from_vec(vec![7, 8, 9]));
push_vals(&mut column, 3, vector);
assert_eq!(
vec![7, 8, 9],
column.values.as_ref().unwrap().time_microsecond_values
);
let vector = Arc::new(TimeSecondVector::from_vec(vec![10, 11, 12]));
push_vals(&mut column, 3, vector);
assert_eq!(
vec![10, 11, 12],
column.values.as_ref().unwrap().time_second_values
);
}
#[test]
fn test_column_put_vector() {
use crate::v1::column::SemanticType;

View File

@@ -22,17 +22,19 @@ use api::v1::{
InsertRequest as GrpcInsertRequest,
};
use common_base::BitVec;
use common_time::time::Time;
use common_time::timestamp::Timestamp;
use common_time::{Date, DateTime};
use datatypes::data_type::{ConcreteDataType, DataType};
use datatypes::prelude::{ValueRef, VectorRef};
use datatypes::scalars::ScalarVector;
use datatypes::schema::SchemaRef;
use datatypes::types::{Int16Type, Int8Type, TimestampType, UInt16Type, UInt8Type};
use datatypes::types::{Int16Type, Int8Type, TimeType, TimestampType, UInt16Type, UInt8Type};
use datatypes::value::Value;
use datatypes::vectors::{
BinaryVector, BooleanVector, DateTimeVector, DateVector, Float32Vector, Float64Vector,
Int32Vector, Int64Vector, PrimitiveVector, StringVector, TimestampMicrosecondVector,
Int32Vector, Int64Vector, PrimitiveVector, StringVector, TimeMicrosecondVector,
TimeMillisecondVector, TimeNanosecondVector, TimeSecondVector, TimestampMicrosecondVector,
TimestampMillisecondVector, TimestampNanosecondVector, TimestampSecondVector, UInt32Vector,
UInt64Vector,
};
@@ -194,7 +196,26 @@ fn collect_column_values(column_datatype: ColumnDataType, values: &Values) -> Ve
Timestamp::new_nanosecond(*v)
))
}
_ => unimplemented!("Implemented in #1961"),
ColumnDataType::TimeSecond => {
collect_values!(values.time_second_values, |v| ValueRef::Time(
Time::new_second(*v)
))
}
ColumnDataType::TimeMillisecond => {
collect_values!(values.time_millisecond_values, |v| ValueRef::Time(
Time::new_millisecond(*v)
))
}
ColumnDataType::TimeMicrosecond => {
collect_values!(values.time_millisecond_values, |v| ValueRef::Time(
Time::new_microsecond(*v)
))
}
ColumnDataType::TimeNanosecond => {
collect_values!(values.time_millisecond_values, |v| ValueRef::Time(
Time::new_nanosecond(*v)
))
}
}
}
@@ -388,6 +409,21 @@ fn values_to_vector(data_type: &ConcreteDataType, values: Values) -> VectorRef {
values.ts_nanosecond_values,
)),
},
ConcreteDataType::Time(unit) => match unit {
TimeType::Second(_) => Arc::new(TimeSecondVector::from_iter_values(
values.time_second_values.iter().map(|x| *x as i32),
)),
TimeType::Millisecond(_) => Arc::new(TimeMillisecondVector::from_iter_values(
values.time_millisecond_values.iter().map(|x| *x as i32),
)),
TimeType::Microsecond(_) => Arc::new(TimeMicrosecondVector::from_vec(
values.time_microsecond_values,
)),
TimeType::Nanosecond(_) => Arc::new(TimeNanosecondVector::from_vec(
values.time_nanosecond_values,
)),
},
ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) => {
unreachable!()
}
@@ -496,6 +532,27 @@ fn convert_values(data_type: &ConcreteDataType, values: Values) -> Vec<Value> {
.into_iter()
.map(|v| Value::Timestamp(Timestamp::new_nanosecond(v)))
.collect(),
ConcreteDataType::Time(TimeType::Second(_)) => values
.time_second_values
.into_iter()
.map(|v| Value::Time(Time::new_second(v)))
.collect(),
ConcreteDataType::Time(TimeType::Millisecond(_)) => values
.time_millisecond_values
.into_iter()
.map(|v| Value::Time(Time::new_millisecond(v)))
.collect(),
ConcreteDataType::Time(TimeType::Microsecond(_)) => values
.time_microsecond_values
.into_iter()
.map(|v| Value::Time(Time::new_microsecond(v)))
.collect(),
ConcreteDataType::Time(TimeType::Nanosecond(_)) => values
.time_nanosecond_values
.into_iter()
.map(|v| Value::Time(Time::new_nanosecond(v)))
.collect(),
ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) => {
unreachable!()
}
@@ -516,10 +573,13 @@ mod tests {
use api::v1::{Column, ColumnDataType};
use common_base::BitVec;
use common_catalog::consts::MITO_ENGINE;
use common_time::timestamp::Timestamp;
use common_time::timestamp::{TimeUnit, Timestamp};
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaBuilder};
use datatypes::types::{TimestampMillisecondType, TimestampSecondType, TimestampType};
use datatypes::types::{
TimeMillisecondType, TimeSecondType, TimeType, TimestampMillisecondType,
TimestampSecondType, TimestampType,
};
use datatypes::value::Value;
use paste::paste;
use snafu::ResultExt;
@@ -576,8 +636,8 @@ mod tests {
);
let column_defs = create_expr.column_defs;
assert_eq!(column_defs[3].name, create_expr.time_index);
assert_eq!(4, column_defs.len());
assert_eq!(column_defs[4].name, create_expr.time_index);
assert_eq!(5, column_defs.len());
assert_eq!(
ConcreteDataType::string_datatype(),
@@ -621,6 +681,20 @@ mod tests {
)
);
assert_eq!(
ConcreteDataType::time_datatype(TimeUnit::Millisecond),
ConcreteDataType::from(
ColumnDataTypeWrapper::try_new(
column_defs
.iter()
.find(|c| c.name == "time")
.unwrap()
.datatype
)
.unwrap()
)
);
assert_eq!(
ConcreteDataType::timestamp_millisecond_datatype(),
ConcreteDataType::from(
@@ -654,7 +728,7 @@ mod tests {
let add_columns = find_new_columns(&schema, &insert_batch.0).unwrap().unwrap();
assert_eq!(2, add_columns.add_columns.len());
assert_eq!(3, add_columns.add_columns.len());
let host_column = &add_columns.add_columns[0];
assert!(host_column.is_key);
@@ -676,6 +750,17 @@ mod tests {
.unwrap()
)
);
let time_column = &add_columns.add_columns[2];
assert!(!time_column.is_key);
assert_eq!(
ConcreteDataType::time_datatype(TimeUnit::Millisecond),
ConcreteDataType::from(
ColumnDataTypeWrapper::try_new(time_column.column_def.as_ref().unwrap().datatype)
.unwrap()
)
);
}
#[test]
@@ -887,6 +972,39 @@ mod tests {
assert_eq!(expect, actual);
}
#[test]
fn test_convert_time_values() {
// second
let actual = convert_values(
&ConcreteDataType::Time(TimeType::Second(TimeSecondType)),
Values {
time_second_values: vec![1_i64, 2_i64, 3_i64],
..Default::default()
},
);
let expect = vec![
Value::Time(Time::new_second(1_i64)),
Value::Time(Time::new_second(2_i64)),
Value::Time(Time::new_second(3_i64)),
];
assert_eq!(expect, actual);
// millisecond
let actual = convert_values(
&ConcreteDataType::Time(TimeType::Millisecond(TimeMillisecondType)),
Values {
time_millisecond_values: vec![1_i64, 2_i64, 3_i64],
..Default::default()
},
);
let expect = vec![
Value::Time(Time::new_millisecond(1_i64)),
Value::Time(Time::new_millisecond(2_i64)),
Value::Time(Time::new_millisecond(3_i64)),
];
assert_eq!(expect, actual);
}
#[test]
fn test_is_null() {
let null_mask = BitVec::from_slice(&[0b0000_0001, 0b0000_1000]);
@@ -940,6 +1058,18 @@ mod tests {
datatype: ColumnDataType::Float64 as i32,
};
let time_vals = column::Values {
time_millisecond_values: vec![100, 101],
..Default::default()
};
let time_column = Column {
column_name: "time".to_string(),
semantic_type: SemanticType::Field as i32,
values: Some(time_vals),
null_mask: vec![0],
datatype: ColumnDataType::TimeMillisecond as i32,
};
let ts_vals = column::Values {
ts_millisecond_values: vec![100, 101],
..Default::default()
@@ -953,7 +1083,7 @@ mod tests {
};
(
vec![host_column, cpu_column, mem_column, ts_column],
vec![host_column, cpu_column, mem_column, time_column, ts_column],
row_count,
)
}

View File

@@ -14,10 +14,11 @@
use api::v1::column::Values;
use common_base::BitVec;
use datatypes::types::{TimestampType, WrapperType};
use datatypes::types::{TimeType, TimestampType, WrapperType};
use datatypes::vectors::{
BinaryVector, BooleanVector, DateTimeVector, DateVector, Float32Vector, Float64Vector,
Int16Vector, Int32Vector, Int64Vector, Int8Vector, StringVector, TimestampMicrosecondVector,
Int16Vector, Int32Vector, Int64Vector, Int8Vector, StringVector, TimeMicrosecondVector,
TimeMillisecondVector, TimeNanosecondVector, TimeSecondVector, TimestampMicrosecondVector,
TimestampMillisecondVector, TimestampNanosecondVector, TimestampSecondVector, UInt16Vector,
UInt32Vector, UInt64Vector, UInt8Vector, VectorRef,
};
@@ -167,6 +168,30 @@ pub fn values(arrays: &[VectorRef]) -> Result<Values> {
TimestampNanosecondVector,
ts_nanosecond_values,
|x| { x.into_native() }
),
(
ConcreteDataType::Time(TimeType::Second(_)),
TimeSecondVector,
time_second_values,
|x| { x.into_native() as i64 }
),
(
ConcreteDataType::Time(TimeType::Millisecond(_)),
TimeMillisecondVector,
time_millisecond_values,
|x| { x.into_native() as i64 }
),
(
ConcreteDataType::Time(TimeType::Microsecond(_)),
TimeMicrosecondVector,
time_microsecond_values,
|x| { x.into_native() }
),
(
ConcreteDataType::Time(TimeType::Nanosecond(_)),
TimeNanosecondVector,
time_nanosecond_values,
|x| { x.into_native() }
)
)
}
@@ -187,6 +212,16 @@ mod tests {
assert_eq!(vec![1, 2, 3], values.i32_values);
}
#[test]
fn test_convert_arrow_array_time_second() {
let array = TimeSecondVector::from(vec![Some(1), Some(2), None, Some(3)]);
let array: VectorRef = Arc::new(array);
let values = values(&[array]).unwrap();
assert_eq!(vec![1, 2, 3], values.time_second_values);
}
#[test]
fn test_convert_arrow_arrays_string() {
let array = StringVector::from(vec![

View File

@@ -5,6 +5,7 @@ edition.workspace = true
license.workspace = true
[dependencies]
arrow.workspace = true
chrono.workspace = true
chrono-tz = "0.8"
common-error = { path = "../error" }

View File

@@ -16,6 +16,7 @@ pub mod date;
pub mod datetime;
pub mod error;
pub mod range;
pub mod time;
pub mod timestamp;
pub mod timestamp_millis;
pub mod timezone;

412
src/common/time/src/time.rs Normal file
View File

@@ -0,0 +1,412 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::cmp::Ordering;
use std::hash::{Hash, Hasher};
use chrono::offset::Local;
use chrono::{NaiveDateTime, NaiveTime, TimeZone as ChronoTimeZone, Utc};
use serde::{Deserialize, Serialize};
use crate::timestamp::TimeUnit;
use crate::timezone::TimeZone;
/// Time value, represents the elapsed time since midnight in the unit of `TimeUnit`.
#[derive(Debug, Clone, Default, Copy, Serialize, Deserialize)]
pub struct Time {
value: i64,
unit: TimeUnit,
}
impl Time {
/// Creates the time by value and `TimeUnit`.
pub fn new(value: i64, unit: TimeUnit) -> Self {
Self { value, unit }
}
/// Creates the time in nanosecond.
pub fn new_nanosecond(value: i64) -> Self {
Self {
value,
unit: TimeUnit::Nanosecond,
}
}
/// Creates the time in second.
pub fn new_second(value: i64) -> Self {
Self {
value,
unit: TimeUnit::Second,
}
}
/// Creates the time in millisecond.
pub fn new_millisecond(value: i64) -> Self {
Self {
value,
unit: TimeUnit::Millisecond,
}
}
/// Creates the time in microsecond.
pub fn new_microsecond(value: i64) -> Self {
Self {
value,
unit: TimeUnit::Microsecond,
}
}
/// Returns the `TimeUnit` of the time.
pub fn unit(&self) -> &TimeUnit {
&self.unit
}
/// Returns the value of the time.
pub fn value(&self) -> i64 {
self.value
}
/// Split a [Time] into seconds part and nanoseconds part.
/// Notice the seconds part of split result is always rounded down to floor.
fn split(&self) -> (i64, u32) {
let sec_mul = (TimeUnit::Second.factor() / self.unit.factor()) as i64;
let nsec_mul = (self.unit.factor() / TimeUnit::Nanosecond.factor()) as i64;
let sec_div = self.value.div_euclid(sec_mul);
let sec_mod = self.value.rem_euclid(sec_mul);
// safety: the max possible value of `sec_mod` is 999,999,999
let nsec = u32::try_from(sec_mod * nsec_mul).unwrap();
(sec_div, nsec)
}
/// Format Time to ISO8601 string. If the time exceeds what chrono time can
/// represent, this function simply print the time unit and value in plain string.
pub fn to_iso8601_string(&self) -> String {
self.as_formatted_string("%H:%M:%S%.f%z", None)
}
/// Format Time for local timezone.
pub fn to_local_string(&self) -> String {
self.as_formatted_string("%H:%M:%S%.f", None)
}
/// Format Time for given timezone.
/// When timezone is None, using local time by default.
pub fn to_timezone_aware_string(&self, tz: Option<TimeZone>) -> String {
self.as_formatted_string("%H:%M:%S%.f", tz)
}
fn as_formatted_string(self, pattern: &str, timezone: Option<TimeZone>) -> String {
if let Some(time) = self.to_chrono_time() {
let date = Utc::now().date_naive();
let datetime = NaiveDateTime::new(date, time);
match timezone {
Some(TimeZone::Offset(offset)) => {
format!("{}", offset.from_utc_datetime(&datetime).format(pattern))
}
Some(TimeZone::Named(tz)) => {
format!("{}", tz.from_utc_datetime(&datetime).format(pattern))
}
None => {
let local = Local {};
format!("{}", local.from_utc_datetime(&datetime).format(pattern))
}
}
} else {
format!("[Time{}: {}]", self.unit, self.value)
}
}
/// Cast the [Time] into chrono NaiveDateTime
pub fn to_chrono_time(&self) -> Option<NaiveTime> {
let (sec, nsec) = self.split();
if let Ok(sec) = u32::try_from(sec) {
NaiveTime::from_num_seconds_from_midnight_opt(sec, nsec)
} else {
None
}
}
}
impl From<i64> for Time {
fn from(v: i64) -> Self {
Self {
value: v,
unit: TimeUnit::Millisecond,
}
}
}
impl From<Time> for i64 {
fn from(t: Time) -> Self {
t.value
}
}
impl From<Time> for i32 {
fn from(t: Time) -> Self {
t.value as i32
}
}
impl From<Time> for serde_json::Value {
fn from(d: Time) -> Self {
serde_json::Value::String(d.to_iso8601_string())
}
}
impl PartialOrd for Time {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for Time {
fn cmp(&self, other: &Self) -> Ordering {
// fast path: most comparisons use the same unit.
if self.unit == other.unit {
return self.value.cmp(&other.value);
}
let (s_sec, s_nsec) = self.split();
let (o_sec, o_nsec) = other.split();
match s_sec.cmp(&o_sec) {
Ordering::Less => Ordering::Less,
Ordering::Greater => Ordering::Greater,
Ordering::Equal => s_nsec.cmp(&o_nsec),
}
}
}
impl PartialEq for Time {
fn eq(&self, other: &Self) -> bool {
self.cmp(other) == Ordering::Equal
}
}
impl Eq for Time {}
impl Hash for Time {
fn hash<H: Hasher>(&self, state: &mut H) {
let (sec, nsec) = self.split();
state.write_i64(sec);
state.write_u32(nsec);
}
}
#[cfg(test)]
mod tests {
use std::collections::hash_map::DefaultHasher;
use serde_json::Value;
use super::*;
#[test]
fn test_time() {
let t = Time::new(1, TimeUnit::Millisecond);
assert_eq!(TimeUnit::Millisecond, *t.unit());
assert_eq!(1, t.value());
assert_eq!(Time::new(1000, TimeUnit::Microsecond), t);
assert!(t > Time::new(999, TimeUnit::Microsecond));
}
#[test]
fn test_cmp_time() {
let t1 = Time::new(0, TimeUnit::Millisecond);
let t2 = Time::new(0, TimeUnit::Second);
assert_eq!(t2, t1);
let t1 = Time::new(100_100, TimeUnit::Millisecond);
let t2 = Time::new(100, TimeUnit::Second);
assert!(t1 > t2);
let t1 = Time::new(10_010_001, TimeUnit::Millisecond);
let t2 = Time::new(100, TimeUnit::Second);
assert!(t1 > t2);
let t1 = Time::new(10_010_001, TimeUnit::Nanosecond);
let t2 = Time::new(100, TimeUnit::Second);
assert!(t1 < t2);
let t1 = Time::new(i64::MAX / 1000 * 1000, TimeUnit::Millisecond);
let t2 = Time::new(i64::MAX / 1000, TimeUnit::Second);
assert_eq!(t2, t1);
let t1 = Time::new(i64::MAX, TimeUnit::Millisecond);
let t2 = Time::new(i64::MAX / 1000 + 1, TimeUnit::Second);
assert!(t2 > t1);
let t1 = Time::new(i64::MAX, TimeUnit::Millisecond);
let t2 = Time::new(i64::MAX / 1000, TimeUnit::Second);
assert!(t2 < t1);
let t1 = Time::new(10_010_001, TimeUnit::Millisecond);
let t2 = Time::new(100, TimeUnit::Second);
assert!(t1 > t2);
let t1 = Time::new(-100 * 10_001, TimeUnit::Millisecond);
let t2 = Time::new(-100, TimeUnit::Second);
assert!(t2 > t1);
}
fn check_hash_eq(t1: Time, t2: Time) {
let mut hasher = DefaultHasher::new();
t1.hash(&mut hasher);
let t1_hash = hasher.finish();
let mut hasher = DefaultHasher::new();
t2.hash(&mut hasher);
let t2_hash = hasher.finish();
assert_eq!(t2_hash, t1_hash);
}
#[test]
fn test_hash() {
check_hash_eq(
Time::new(0, TimeUnit::Millisecond),
Time::new(0, TimeUnit::Second),
);
check_hash_eq(
Time::new(1000, TimeUnit::Millisecond),
Time::new(1, TimeUnit::Second),
);
check_hash_eq(
Time::new(1_000_000, TimeUnit::Microsecond),
Time::new(1, TimeUnit::Second),
);
check_hash_eq(
Time::new(1_000_000_000, TimeUnit::Nanosecond),
Time::new(1, TimeUnit::Second),
);
}
#[test]
pub fn test_from_i64() {
let t: Time = 42.into();
assert_eq!(42, t.value());
assert_eq!(TimeUnit::Millisecond, *t.unit());
}
#[test]
fn test_to_iso8601_string() {
std::env::set_var("TZ", "Asia/Shanghai");
let time_millis = 1000001;
let ts = Time::new_millisecond(time_millis);
assert_eq!("08:16:40.001+0800", ts.to_iso8601_string());
let time_millis = 1000;
let ts = Time::new_millisecond(time_millis);
assert_eq!("08:00:01+0800", ts.to_iso8601_string());
let time_millis = 1;
let ts = Time::new_millisecond(time_millis);
assert_eq!("08:00:00.001+0800", ts.to_iso8601_string());
let time_seconds = 9 * 3600;
let ts = Time::new_second(time_seconds);
assert_eq!("17:00:00+0800", ts.to_iso8601_string());
let time_seconds = 23 * 3600;
let ts = Time::new_second(time_seconds);
assert_eq!("07:00:00+0800", ts.to_iso8601_string());
}
#[test]
fn test_serialize_to_json_value() {
std::env::set_var("TZ", "Asia/Shanghai");
assert_eq!(
"08:00:01+0800",
match serde_json::Value::from(Time::new(1, TimeUnit::Second)) {
Value::String(s) => s,
_ => unreachable!(),
}
);
assert_eq!(
"08:00:00.001+0800",
match serde_json::Value::from(Time::new(1, TimeUnit::Millisecond)) {
Value::String(s) => s,
_ => unreachable!(),
}
);
assert_eq!(
"08:00:00.000001+0800",
match serde_json::Value::from(Time::new(1, TimeUnit::Microsecond)) {
Value::String(s) => s,
_ => unreachable!(),
}
);
assert_eq!(
"08:00:00.000000001+0800",
match serde_json::Value::from(Time::new(1, TimeUnit::Nanosecond)) {
Value::String(s) => s,
_ => unreachable!(),
}
);
}
#[test]
fn test_to_timezone_aware_string() {
std::env::set_var("TZ", "Asia/Shanghai");
assert_eq!(
"08:00:00.001",
Time::new(1, TimeUnit::Millisecond).to_timezone_aware_string(None)
);
assert_eq!(
"08:00:00.001",
Time::new(1, TimeUnit::Millisecond)
.to_timezone_aware_string(TimeZone::from_tz_string("SYSTEM").unwrap())
);
assert_eq!(
"08:00:00.001",
Time::new(1, TimeUnit::Millisecond)
.to_timezone_aware_string(TimeZone::from_tz_string("+08:00").unwrap())
);
assert_eq!(
"07:00:00.001",
Time::new(1, TimeUnit::Millisecond)
.to_timezone_aware_string(TimeZone::from_tz_string("+07:00").unwrap())
);
assert_eq!(
"23:00:00.001",
Time::new(1, TimeUnit::Millisecond)
.to_timezone_aware_string(TimeZone::from_tz_string("-01:00").unwrap())
);
assert_eq!(
"08:00:00.001",
Time::new(1, TimeUnit::Millisecond)
.to_timezone_aware_string(TimeZone::from_tz_string("Asia/Shanghai").unwrap())
);
assert_eq!(
"00:00:00.001",
Time::new(1, TimeUnit::Millisecond)
.to_timezone_aware_string(TimeZone::from_tz_string("UTC").unwrap())
);
assert_eq!(
"02:00:00.001",
Time::new(1, TimeUnit::Millisecond)
.to_timezone_aware_string(TimeZone::from_tz_string("Europe/Berlin").unwrap())
);
assert_eq!(
"03:00:00.001",
Time::new(1, TimeUnit::Millisecond)
.to_timezone_aware_string(TimeZone::from_tz_string("Europe/Moscow").unwrap())
);
}
}

View File

@@ -19,6 +19,7 @@ use std::hash::{Hash, Hasher};
use std::str::FromStr;
use std::time::Duration;
use arrow::datatypes::TimeUnit as ArrowTimeUnit;
use chrono::offset::Local;
use chrono::{DateTime, LocalResult, NaiveDateTime, TimeZone as ChronoTimeZone, Utc};
use serde::{Deserialize, Serialize};
@@ -308,6 +309,23 @@ pub enum TimeUnit {
Nanosecond,
}
impl From<&ArrowTimeUnit> for TimeUnit {
fn from(unit: &ArrowTimeUnit) -> Self {
match unit {
ArrowTimeUnit::Second => Self::Second,
ArrowTimeUnit::Millisecond => Self::Millisecond,
ArrowTimeUnit::Microsecond => Self::Microsecond,
ArrowTimeUnit::Nanosecond => Self::Nanosecond,
}
}
}
impl From<ArrowTimeUnit> for TimeUnit {
fn from(unit: ArrowTimeUnit) -> Self {
(&unit).into()
}
}
impl Display for TimeUnit {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
@@ -345,6 +363,15 @@ impl TimeUnit {
TimeUnit::Nanosecond => "ns",
}
}
pub fn as_arrow_time_unit(&self) -> ArrowTimeUnit {
match self {
Self::Second => ArrowTimeUnit::Second,
Self::Millisecond => ArrowTimeUnit::Millisecond,
Self::Microsecond => ArrowTimeUnit::Microsecond,
Self::Nanosecond => ArrowTimeUnit::Nanosecond,
}
}
}
impl PartialOrd for Timestamp {
@@ -1010,4 +1037,21 @@ mod tests {
.to_timezone_aware_string(TimeZone::from_tz_string("Europe/Moscow").unwrap())
);
}
#[test]
fn test_from_arrow_time_unit() {
assert_eq!(TimeUnit::Second, TimeUnit::from(ArrowTimeUnit::Second));
assert_eq!(
TimeUnit::Millisecond,
TimeUnit::from(ArrowTimeUnit::Millisecond)
);
assert_eq!(
TimeUnit::Microsecond,
TimeUnit::from(ArrowTimeUnit::Microsecond)
);
assert_eq!(
TimeUnit::Nanosecond,
TimeUnit::from(ArrowTimeUnit::Nanosecond)
);
}
}

View File

@@ -24,8 +24,8 @@ use crate::error::{self, Error, Result};
use crate::type_id::LogicalTypeId;
use crate::types::{
BinaryType, BooleanType, DateTimeType, DateType, DictionaryType, Float32Type, Float64Type,
Int16Type, Int32Type, Int64Type, Int8Type, ListType, NullType, StringType,
TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
Int16Type, Int32Type, Int64Type, Int8Type, ListType, NullType, StringType, TimeMillisecondType,
TimeType, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
TimestampSecondType, TimestampType, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
};
use crate::value::Value;
@@ -53,10 +53,11 @@ pub enum ConcreteDataType {
Binary(BinaryType),
String(StringType),
// Date types:
// Date and time types:
Date(DateType),
DateTime(DateTimeType),
Timestamp(TimestampType),
Time(TimeType),
// Compound types:
List(ListType),
@@ -83,6 +84,7 @@ impl fmt::Display for ConcreteDataType {
ConcreteDataType::Date(_) => write!(f, "Date"),
ConcreteDataType::DateTime(_) => write!(f, "DateTime"),
ConcreteDataType::Timestamp(_) => write!(f, "Timestamp"),
ConcreteDataType::Time(_) => write!(f, "Time"),
ConcreteDataType::List(_) => write!(f, "List"),
ConcreteDataType::Dictionary(_) => write!(f, "Dictionary"),
}
@@ -110,6 +112,7 @@ impl ConcreteDataType {
| ConcreteDataType::Date(_)
| ConcreteDataType::DateTime(_)
| ConcreteDataType::Timestamp(_)
| ConcreteDataType::Time(_)
)
}
@@ -123,6 +126,7 @@ impl ConcreteDataType {
| ConcreteDataType::Date(_)
| ConcreteDataType::DateTime(_)
| ConcreteDataType::Timestamp(_)
| ConcreteDataType::Time(_)
)
}
@@ -181,6 +185,15 @@ impl ConcreteDataType {
_ => None,
}
}
/// Try to cast data type as a [`TimeType`].
pub fn as_time(&self) -> Option<TimeType> {
match self {
ConcreteDataType::Int64(_) => Some(TimeType::Millisecond(TimeMillisecondType)),
ConcreteDataType::Time(t) => Some(*t),
_ => None,
}
}
}
impl From<&ConcreteDataType> for ConcreteDataType {
@@ -219,6 +232,8 @@ impl TryFrom<&ArrowDataType> for ConcreteDataType {
let value_type = ConcreteDataType::from_arrow_type(value_type);
Self::Dictionary(DictionaryType::new(key_type, value_type))
}
ArrowDataType::Time32(u) => ConcreteDataType::Time(TimeType::from_unit(u.into())),
ArrowDataType::Time64(u) => ConcreteDataType::Time(TimeType::from_unit(u.into())),
_ => {
return error::UnsupportedArrowTypeSnafu {
arrow_type: dt.clone(),
@@ -271,6 +286,31 @@ impl ConcreteDataType {
ConcreteDataType::Timestamp(TimestampType::Nanosecond(TimestampNanosecondType::default()))
}
/// Returns the time data type with `TimeUnit`.
pub fn time_datatype(unit: TimeUnit) -> Self {
ConcreteDataType::Time(TimeType::from_unit(unit))
}
/// Creates a [Time(TimeSecondType)] datatype.
pub fn time_second_datatype() -> Self {
Self::time_datatype(TimeUnit::Second)
}
/// Creates a [Time(TimeMillisecondType)] datatype.
pub fn time_millisecond_datatype() -> Self {
Self::time_datatype(TimeUnit::Millisecond)
}
/// Creates a [Time(TimeMicrosecond)] datatype.
pub fn time_microsecond_datatype() -> Self {
Self::time_datatype(TimeUnit::Microsecond)
}
/// Creates a [Time(TimeNanosecond)] datatype.
pub fn time_nanosecond_datatype() -> Self {
Self::time_datatype(TimeUnit::Nanosecond)
}
pub fn timestamp_datatype(unit: TimeUnit) -> Self {
match unit {
TimeUnit::Second => Self::timestamp_second_datatype(),
@@ -464,6 +504,10 @@ mod tests {
assert!(!ConcreteDataType::string_datatype().is_timestamp_compatible());
assert!(!ConcreteDataType::int32_datatype().is_timestamp_compatible());
assert!(!ConcreteDataType::uint64_datatype().is_timestamp_compatible());
assert!(!ConcreteDataType::time_second_datatype().is_timestamp_compatible());
assert!(!ConcreteDataType::time_millisecond_datatype().is_timestamp_compatible());
assert!(!ConcreteDataType::time_microsecond_datatype().is_timestamp_compatible());
assert!(!ConcreteDataType::time_nanosecond_datatype().is_timestamp_compatible());
}
#[test]
@@ -497,6 +541,10 @@ mod tests {
assert!(ConcreteDataType::timestamp_millisecond_datatype().is_stringifiable());
assert!(ConcreteDataType::timestamp_microsecond_datatype().is_stringifiable());
assert!(ConcreteDataType::timestamp_nanosecond_datatype().is_stringifiable());
assert!(ConcreteDataType::time_second_datatype().is_stringifiable());
assert!(ConcreteDataType::time_millisecond_datatype().is_stringifiable());
assert!(ConcreteDataType::time_microsecond_datatype().is_stringifiable());
assert!(ConcreteDataType::time_nanosecond_datatype().is_stringifiable());
}
#[test]
@@ -511,6 +559,10 @@ mod tests {
assert!(ConcreteDataType::timestamp_millisecond_datatype().is_signed());
assert!(ConcreteDataType::timestamp_microsecond_datatype().is_signed());
assert!(ConcreteDataType::timestamp_nanosecond_datatype().is_signed());
assert!(ConcreteDataType::time_second_datatype().is_signed());
assert!(ConcreteDataType::time_millisecond_datatype().is_signed());
assert!(ConcreteDataType::time_microsecond_datatype().is_signed());
assert!(ConcreteDataType::time_nanosecond_datatype().is_signed());
assert!(!ConcreteDataType::uint8_datatype().is_signed());
assert!(!ConcreteDataType::uint16_datatype().is_signed());
@@ -533,6 +585,10 @@ mod tests {
assert!(!ConcreteDataType::timestamp_millisecond_datatype().is_unsigned());
assert!(!ConcreteDataType::timestamp_microsecond_datatype().is_unsigned());
assert!(!ConcreteDataType::timestamp_nanosecond_datatype().is_unsigned());
assert!(!ConcreteDataType::time_second_datatype().is_unsigned());
assert!(!ConcreteDataType::time_millisecond_datatype().is_unsigned());
assert!(!ConcreteDataType::time_microsecond_datatype().is_unsigned());
assert!(!ConcreteDataType::time_nanosecond_datatype().is_unsigned());
assert!(ConcreteDataType::uint8_datatype().is_unsigned());
assert!(ConcreteDataType::uint16_datatype().is_unsigned());
@@ -634,5 +690,6 @@ mod tests {
ConcreteDataType::from_arrow_type(&ArrowDataType::Date32).to_string(),
"Date"
);
assert_eq!(ConcreteDataType::time_second_datatype().to_string(), "Time");
}
}

View File

@@ -77,6 +77,12 @@ pub enum Error {
#[snafu(display("{}", msg))]
CastType { msg: String, location: Location },
#[snafu(display("Failed to cast arrow time i32 type into i64"))]
CastTimeType {
source: std::num::TryFromIntError,
location: Location,
},
#[snafu(display("Arrow failed to compute, source: {}", source))]
ArrowCompute {
source: arrow::error::ArrowError,

View File

@@ -22,6 +22,7 @@ pub mod prelude;
pub mod scalars;
pub mod schema;
pub mod serialize;
pub mod time;
pub mod timestamp;
pub mod type_id;
pub mod types;

156
src/datatypes/src/time.rs Normal file
View File

@@ -0,0 +1,156 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_time::time::Time;
use common_time::timestamp::TimeUnit;
use paste::paste;
use serde::{Deserialize, Serialize};
use crate::prelude::{Scalar, Value, ValueRef};
use crate::scalars::ScalarRef;
use crate::types::{
TimeMicrosecondType, TimeMillisecondType, TimeNanosecondType, TimeSecondType, WrapperType,
};
use crate::vectors::{
TimeMicrosecondVector, TimeMillisecondVector, TimeNanosecondVector, TimeSecondVector,
};
macro_rules! define_time_with_unit {
($unit: ident, $native_ty: ty) => {
paste! {
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct [<Time $unit>](pub Time);
impl [<Time $unit>] {
pub fn new(val: i64) -> Self {
Self(Time::new(val, TimeUnit::$unit))
}
}
impl Default for [<Time $unit>] {
fn default() -> Self {
Self::new(0)
}
}
impl From<[<Time $unit>]> for Value {
fn from(t: [<Time $unit>]) -> Value {
Value::Time(t.0)
}
}
impl From<[<Time $unit>]> for serde_json::Value {
fn from(t: [<Time $unit>]) -> Self {
t.0.into()
}
}
impl From<[<Time $unit>]> for ValueRef<'static> {
fn from(t: [<Time $unit>]) -> Self {
ValueRef::Time(t.0)
}
}
impl Scalar for [<Time $unit>] {
type VectorType = [<Time $unit Vector>];
type RefType<'a> = [<Time $unit>];
fn as_scalar_ref(&self) -> Self::RefType<'_> {
*self
}
fn upcast_gat<'short, 'long: 'short>(
long: Self::RefType<'long>,
) -> Self::RefType<'short> {
long
}
}
impl<'a> ScalarRef<'a> for [<Time $unit>] {
type ScalarType = [<Time $unit>];
fn to_owned_scalar(&self) -> Self::ScalarType {
*self
}
}
impl WrapperType for [<Time $unit>] {
type LogicalType = [<Time $unit Type>];
type Native = $native_ty;
fn from_native(value: Self::Native) -> Self {
Self::new(value.into())
}
fn into_native(self) -> Self::Native {
self.0.into()
}
}
impl From<i64> for [<Time $unit>] {
fn from(val: i64) -> Self {
[<Time $unit>]::from_native(val as $native_ty)
}
}
impl From<[<Time $unit>]> for i64 {
fn from(val: [<Time $unit>]) -> Self {
val.0.value()
}
}
}
};
}
define_time_with_unit!(Second, i32);
define_time_with_unit!(Millisecond, i32);
define_time_with_unit!(Microsecond, i64);
define_time_with_unit!(Nanosecond, i64);
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_to_serde_json_value() {
std::env::set_var("TZ", "Asia/Shanghai");
let time = TimeSecond::new(123);
let val = serde_json::Value::from(time);
match val {
serde_json::Value::String(s) => {
assert_eq!("08:02:03+0800", s);
}
_ => unreachable!(),
}
}
#[test]
fn test_time_scalar() {
let time = TimeSecond::new(123);
assert_eq!(time, time.as_scalar_ref());
assert_eq!(time, time.to_owned_scalar());
assert_eq!(123, time.into_native());
let time = TimeMillisecond::new(123);
assert_eq!(time, time.as_scalar_ref());
assert_eq!(time, time.to_owned_scalar());
assert_eq!(123, time.into_native());
let time = TimeMicrosecond::new(123);
assert_eq!(time, time.as_scalar_ref());
assert_eq!(time, time.to_owned_scalar());
assert_eq!(123, time.into_native());
let time = TimeNanosecond::new(123);
assert_eq!(time, time.as_scalar_ref());
assert_eq!(time, time.to_owned_scalar());
assert_eq!(123, time.into_native());
}
}

View File

@@ -46,6 +46,11 @@ pub enum LogicalTypeId {
TimestampMillisecond,
TimestampMicrosecond,
TimestampNanosecond,
/// A 64-bit time representing the elapsed time since midnight in the unit of `TimeUnit`.
TimeSecond,
TimeMillisecond,
TimeMicrosecond,
TimeNanosecond,
List,
Dictionary,
@@ -93,6 +98,10 @@ impl LogicalTypeId {
ConcreteDataType::null_datatype(),
ConcreteDataType::null_datatype(),
),
LogicalTypeId::TimeSecond => ConcreteDataType::time_second_datatype(),
LogicalTypeId::TimeMillisecond => ConcreteDataType::time_millisecond_datatype(),
LogicalTypeId::TimeMicrosecond => ConcreteDataType::time_microsecond_datatype(),
LogicalTypeId::TimeNanosecond => ConcreteDataType::time_nanosecond_datatype(),
}
}
}

View File

@@ -21,6 +21,7 @@ mod list_type;
mod null_type;
mod primitive_type;
mod string_type;
mod time_type;
mod timestamp_type;
pub use binary_type::BinaryType;
@@ -35,6 +36,9 @@ pub use primitive_type::{
NativeType, OrdPrimitive, UInt16Type, UInt32Type, UInt64Type, UInt8Type, WrapperType,
};
pub use string_type::StringType;
pub use time_type::{
TimeMicrosecondType, TimeMillisecondType, TimeNanosecondType, TimeSecondType, TimeType,
};
pub use timestamp_type::{
TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
TimestampSecondType, TimestampType,

View File

@@ -0,0 +1,220 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! TimeType represents the elapsed time since midnight in the unit of `TimeUnit`.
use arrow::datatypes::{
DataType as ArrowDataType, Time32MillisecondType as ArrowTimeMillisecondType,
Time32SecondType as ArrowTimeSecondType, Time64MicrosecondType as ArrowTimeMicrosecondType,
Time64NanosecondType as ArrowTimeNanosecondType, TimeUnit as ArrowTimeUnit,
};
use common_time::time::Time;
use common_time::timestamp::TimeUnit;
use enum_dispatch::enum_dispatch;
use paste::paste;
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use crate::error;
use crate::prelude::{
ConcreteDataType, DataType, LogicalTypeId, MutableVector, ScalarVectorBuilder, Value, ValueRef,
Vector,
};
use crate::time::{TimeMicrosecond, TimeMillisecond, TimeNanosecond, TimeSecond};
use crate::types::LogicalPrimitiveType;
use crate::vectors::{
PrimitiveVector, TimeMicrosecondVector, TimeMicrosecondVectorBuilder, TimeMillisecondVector,
TimeMillisecondVectorBuilder, TimeNanosecondVector, TimeNanosecondVectorBuilder,
TimeSecondVector, TimeSecondVectorBuilder,
};
const SECOND_VARIATION: u64 = 0;
const MILLISECOND_VARIATION: u64 = 3;
const MICROSECOND_VARIATION: u64 = 6;
const NANOSECOND_VARIATION: u64 = 9;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[enum_dispatch(DataType)]
pub enum TimeType {
Second(TimeSecondType),
Millisecond(TimeMillisecondType),
Microsecond(TimeMicrosecondType),
Nanosecond(TimeNanosecondType),
}
impl TimeType {
/// Creates time type from `TimeUnit`.
pub fn from_unit(unit: TimeUnit) -> Self {
match unit {
TimeUnit::Second => Self::Second(TimeSecondType),
TimeUnit::Millisecond => Self::Millisecond(TimeMillisecondType),
TimeUnit::Microsecond => Self::Microsecond(TimeMicrosecondType),
TimeUnit::Nanosecond => Self::Nanosecond(TimeNanosecondType),
}
}
/// Returns the time type's `TimeUnit`.
pub fn unit(&self) -> TimeUnit {
match self {
TimeType::Second(_) => TimeUnit::Second,
TimeType::Millisecond(_) => TimeUnit::Millisecond,
TimeType::Microsecond(_) => TimeUnit::Microsecond,
TimeType::Nanosecond(_) => TimeUnit::Nanosecond,
}
}
/// Returns the time type's precision.
pub fn precision(&self) -> u64 {
match self {
TimeType::Second(_) => SECOND_VARIATION,
TimeType::Millisecond(_) => MILLISECOND_VARIATION,
TimeType::Microsecond(_) => MICROSECOND_VARIATION,
TimeType::Nanosecond(_) => NANOSECOND_VARIATION,
}
}
}
macro_rules! impl_data_type_for_time {
($unit: ident,$arrow_type: ident, $type: ty) => {
paste! {
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct [<Time $unit Type>];
impl DataType for [<Time $unit Type>] {
fn name(&self) -> &str {
stringify!([<Time $unit>])
}
fn logical_type_id(&self) -> LogicalTypeId {
LogicalTypeId::[<Time $unit>]
}
fn default_value(&self) -> Value {
Value::Time(Time::new(0, TimeUnit::$unit))
}
fn as_arrow_type(&self) -> ArrowDataType {
ArrowDataType::$arrow_type(ArrowTimeUnit::$unit)
}
fn create_mutable_vector(&self, capacity: usize) -> Box<dyn MutableVector> {
Box::new([<Time $unit Vector Builder>]::with_capacity(capacity))
}
fn is_timestamp_compatible(&self) -> bool {
false
}
}
impl LogicalPrimitiveType for [<Time $unit Type>] {
type ArrowPrimitive = [<Arrow Time $unit Type>];
type Native = $type;
type Wrapper = [<Time $unit>];
type LargestType = Self;
fn build_data_type() -> ConcreteDataType {
ConcreteDataType::Time(TimeType::$unit(
[<Time $unit Type>]::default(),
))
}
fn type_name() -> &'static str {
stringify!([<Time $unit Type>])
}
fn cast_vector(vector: &dyn Vector) -> crate::Result<&PrimitiveVector<Self>> {
vector
.as_any()
.downcast_ref::<[<Time $unit Vector>]>()
.with_context(|| error::CastTypeSnafu {
msg: format!(
"Failed to cast {} to {}",
vector.vector_type_name(), stringify!([<Time $unit Vector>])
),
})
}
fn cast_value_ref(value: ValueRef) -> crate::Result<Option<Self::Wrapper>> {
match value {
ValueRef::Null => Ok(None),
ValueRef::Int64(v) =>{
Ok(Some([<Time $unit>]::from(v)))
}
ValueRef::Time(t) => match t.unit() {
TimeUnit::$unit => Ok(Some([<Time $unit>](t))),
other => error::CastTypeSnafu {
msg: format!(
"Failed to cast Time value with different unit {:?} to {}",
other, stringify!([<Time $unit>])
),
}
.fail(),
},
other => error::CastTypeSnafu {
msg: format!("Failed to cast value {:?} to {}", other, stringify!([<Time $unit>])),
}
.fail(),
}
}
}
}
}
}
impl_data_type_for_time!(Second, Time32, i32);
impl_data_type_for_time!(Millisecond, Time32, i32);
impl_data_type_for_time!(Nanosecond, Time64, i64);
impl_data_type_for_time!(Microsecond, Time64, i64);
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_time_type_unit() {
assert_eq!(TimeUnit::Second, TimeType::Second(TimeSecondType).unit());
assert_eq!(
TimeUnit::Millisecond,
TimeType::Millisecond(TimeMillisecondType).unit()
);
assert_eq!(
TimeUnit::Microsecond,
TimeType::Microsecond(TimeMicrosecondType).unit()
);
assert_eq!(
TimeUnit::Nanosecond,
TimeType::Nanosecond(TimeNanosecondType).unit()
);
}
#[test]
fn test_as_arrow_datatype() {
assert_eq!(
ArrowDataType::Time32(ArrowTimeUnit::Second),
TimeSecondType::default().as_arrow_type()
);
assert_eq!(
ArrowDataType::Time32(ArrowTimeUnit::Millisecond),
TimeMillisecondType::default().as_arrow_type()
);
assert_eq!(
ArrowDataType::Time64(ArrowTimeUnit::Microsecond),
TimeMicrosecondType::default().as_arrow_type()
);
assert_eq!(
ArrowDataType::Time64(ArrowTimeUnit::Nanosecond),
TimeNanosecondType::default().as_arrow_type()
);
}
}

View File

@@ -22,11 +22,12 @@ use common_base::bytes::{Bytes, StringBytes};
use common_telemetry::logging;
use common_time::date::Date;
use common_time::datetime::DateTime;
use common_time::time::Time;
use common_time::timestamp::{TimeUnit, Timestamp};
use datafusion_common::ScalarValue;
pub use ordered_float::OrderedFloat;
use serde::{Deserialize, Serialize};
use snafu::ensure;
use snafu::{ensure, ResultExt};
use crate::error;
use crate::error::Result;
@@ -66,6 +67,7 @@ pub enum Value {
Date(Date),
DateTime(DateTime),
Timestamp(Timestamp),
Time(Time),
List(ListValue),
}
@@ -97,6 +99,7 @@ impl Display for Value {
Value::Date(v) => write!(f, "{v}"),
Value::DateTime(v) => write!(f, "{v}"),
Value::Timestamp(v) => write!(f, "{}", v.to_iso8601_string()),
Value::Time(t) => write!(f, "{}", t.to_iso8601_string()),
Value::List(v) => {
let default = Box::<Vec<Value>>::default();
let items = v.items().as_ref().unwrap_or(&default);
@@ -134,6 +137,7 @@ impl Value {
Value::Binary(_) => ConcreteDataType::binary_datatype(),
Value::Date(_) => ConcreteDataType::date_datatype(),
Value::DateTime(_) => ConcreteDataType::datetime_datatype(),
Value::Time(t) => ConcreteDataType::time_datatype(*t.unit()),
Value::Timestamp(v) => ConcreteDataType::timestamp_datatype(v.unit()),
Value::List(list) => ConcreteDataType::list_datatype(list.datatype().clone()),
}
@@ -177,6 +181,7 @@ impl Value {
Value::DateTime(v) => ValueRef::DateTime(*v),
Value::List(v) => ValueRef::List(ListValueRef::Ref { val: v }),
Value::Timestamp(v) => ValueRef::Timestamp(*v),
Value::Time(v) => ValueRef::Time(*v),
}
}
@@ -189,6 +194,15 @@ impl Value {
}
}
/// Cast Value to [Time]. Return None if value is not a valid time data type.
pub fn as_time(&self) -> Option<Time> {
match self {
Value::Int64(v) => Some(Time::new_millisecond(*v)),
Value::Time(t) => Some(*t),
_ => None,
}
}
/// Returns the logical type of the value.
pub fn logical_type_id(&self) -> LogicalTypeId {
match self {
@@ -215,6 +229,12 @@ impl Value {
TimeUnit::Microsecond => LogicalTypeId::TimestampMicrosecond,
TimeUnit::Nanosecond => LogicalTypeId::TimestampNanosecond,
},
Value::Time(t) => match t.unit() {
TimeUnit::Second => LogicalTypeId::TimeSecond,
TimeUnit::Millisecond => LogicalTypeId::TimeMillisecond,
TimeUnit::Microsecond => LogicalTypeId::TimeMicrosecond,
TimeUnit::Nanosecond => LogicalTypeId::TimeNanosecond,
},
}
}
@@ -248,21 +268,22 @@ impl Value {
Value::Binary(v) => ScalarValue::LargeBinary(Some(v.to_vec())),
Value::Date(v) => ScalarValue::Date32(Some(v.val())),
Value::DateTime(v) => ScalarValue::Date64(Some(v.val())),
Value::Null => to_null_scalar_value(output_type),
Value::Null => to_null_scalar_value(output_type)?,
Value::List(list) => {
// Safety: The logical type of the value and output_type are the same.
let list_type = output_type.as_list().unwrap();
list.try_to_scalar_value(list_type)?
}
Value::Timestamp(t) => timestamp_to_scalar_value(t.unit(), Some(t.value())),
Value::Time(t) => time_to_scalar_value(*t.unit(), Some(t.value()))?,
};
Ok(scalar_value)
}
}
pub fn to_null_scalar_value(output_type: &ConcreteDataType) -> ScalarValue {
match output_type {
pub fn to_null_scalar_value(output_type: &ConcreteDataType) -> Result<ScalarValue> {
Ok(match output_type {
ConcreteDataType::Null(_) => ScalarValue::Null,
ConcreteDataType::Boolean(_) => ScalarValue::Boolean(None),
ConcreteDataType::Int8(_) => ScalarValue::Int8(None),
@@ -285,9 +306,10 @@ pub fn to_null_scalar_value(output_type: &ConcreteDataType) -> ScalarValue {
}
ConcreteDataType::Dictionary(dict) => ScalarValue::Dictionary(
Box::new(dict.key_type().as_arrow_type()),
Box::new(to_null_scalar_value(dict.value_type())),
Box::new(to_null_scalar_value(dict.value_type())?),
),
}
ConcreteDataType::Time(t) => time_to_scalar_value(t.unit(), None)?,
})
}
fn new_item_field(data_type: ArrowDataType) -> Field {
@@ -303,6 +325,22 @@ pub fn timestamp_to_scalar_value(unit: TimeUnit, val: Option<i64>) -> ScalarValu
}
}
/// Cast the 64-bit elapsed time into the arrow ScalarValue by time unit.
pub fn time_to_scalar_value(unit: TimeUnit, val: Option<i64>) -> Result<ScalarValue> {
Ok(match unit {
TimeUnit::Second => ScalarValue::Time32Second(
val.map(|i| i.try_into().context(error::CastTimeTypeSnafu))
.transpose()?,
),
TimeUnit::Millisecond => ScalarValue::Time32Millisecond(
val.map(|i| i.try_into().context(error::CastTimeTypeSnafu))
.transpose()?,
),
TimeUnit::Microsecond => ScalarValue::Time64Microsecond(val),
TimeUnit::Nanosecond => ScalarValue::Time64Nanosecond(val),
})
}
/// Convert [ScalarValue] to [Timestamp].
/// Return `None` if given scalar value cannot be converted to a valid timestamp.
pub fn scalar_value_to_timestamp(scalar: &ScalarValue) -> Option<Timestamp> {
@@ -348,6 +386,7 @@ macro_rules! impl_ord_for_value_like {
($Type::Date(v1), $Type::Date(v2)) => v1.cmp(v2),
($Type::DateTime(v1), $Type::DateTime(v2)) => v1.cmp(v2),
($Type::Timestamp(v1), $Type::Timestamp(v2)) => v1.cmp(v2),
($Type::Time(v1), $Type::Time(v2)) => v1.cmp(v2),
($Type::List(v1), $Type::List(v2)) => v1.cmp(v2),
_ => panic!(
"Cannot compare different values {:?} and {:?}",
@@ -453,6 +492,7 @@ impl TryFrom<Value> for serde_json::Value {
Value::DateTime(v) => serde_json::Value::Number(v.val().into()),
Value::List(v) => serde_json::to_value(v)?,
Value::Timestamp(v) => serde_json::to_value(v.value())?,
Value::Time(v) => serde_json::to_value(v.value())?,
};
Ok(json_value)
@@ -590,16 +630,25 @@ impl TryFrom<ScalarValue> for Value {
ScalarValue::TimestampNanosecond(t, _) => t
.map(|x| Value::Timestamp(Timestamp::new(x, TimeUnit::Nanosecond)))
.unwrap_or(Value::Null),
ScalarValue::Time32Second(t) => t
.map(|x| Value::Time(Time::new(x as i64, TimeUnit::Second)))
.unwrap_or(Value::Null),
ScalarValue::Time32Millisecond(t) => t
.map(|x| Value::Time(Time::new(x as i64, TimeUnit::Millisecond)))
.unwrap_or(Value::Null),
ScalarValue::Time64Microsecond(t) => t
.map(|x| Value::Time(Time::new(x, TimeUnit::Microsecond)))
.unwrap_or(Value::Null),
ScalarValue::Time64Nanosecond(t) => t
.map(|x| Value::Time(Time::new(x, TimeUnit::Nanosecond)))
.unwrap_or(Value::Null),
ScalarValue::Decimal128(_, _, _)
| ScalarValue::IntervalYearMonth(_)
| ScalarValue::IntervalDayTime(_)
| ScalarValue::IntervalMonthDayNano(_)
| ScalarValue::Struct(_, _)
| ScalarValue::Dictionary(_, _)
| ScalarValue::Time32Second(_)
| ScalarValue::Time32Millisecond(_)
| ScalarValue::Time64Microsecond(_)
| ScalarValue::Time64Nanosecond(_) => {
| ScalarValue::Dictionary(_, _) => {
return error::UnsupportedArrowTypeSnafu {
arrow_type: v.get_datatype(),
}
@@ -636,6 +685,9 @@ pub enum ValueRef<'a> {
Date(Date),
DateTime(DateTime),
Timestamp(Timestamp),
Time(Time),
// Compound types:
List(ListValueRef<'a>),
}
@@ -687,10 +739,16 @@ impl<'a> ValueRef<'a> {
impl_as_for_value_ref!(self, DateTime)
}
/// Cast itself to [Timestamp].
pub fn as_timestamp(&self) -> Result<Option<Timestamp>> {
impl_as_for_value_ref!(self, Timestamp)
}
/// Cast itself to [Time].
pub fn as_time(&self) -> Result<Option<Time>> {
impl_as_for_value_ref!(self, Time)
}
/// Cast itself to [ListValueRef].
pub fn as_list(&self) -> Result<Option<ListValueRef>> {
impl_as_for_value_ref!(self, List)
@@ -742,6 +800,7 @@ impl_value_ref_from!(Float64, f64);
impl_value_ref_from!(Date, Date);
impl_value_ref_from!(DateTime, DateTime);
impl_value_ref_from!(Timestamp, Timestamp);
impl_value_ref_from!(Time, Time);
impl<'a> From<&'a str> for ValueRef<'a> {
fn from(string: &'a str) -> ValueRef<'a> {
@@ -1005,6 +1064,42 @@ mod tests {
.unwrap()
);
assert_eq!(
Value::Time(Time::new(1, TimeUnit::Second)),
ScalarValue::Time32Second(Some(1)).try_into().unwrap()
);
assert_eq!(
Value::Null,
ScalarValue::Time32Second(None).try_into().unwrap()
);
assert_eq!(
Value::Time(Time::new(1, TimeUnit::Millisecond)),
ScalarValue::Time32Millisecond(Some(1)).try_into().unwrap()
);
assert_eq!(
Value::Null,
ScalarValue::Time32Millisecond(None).try_into().unwrap()
);
assert_eq!(
Value::Time(Time::new(1, TimeUnit::Microsecond)),
ScalarValue::Time64Microsecond(Some(1)).try_into().unwrap()
);
assert_eq!(
Value::Null,
ScalarValue::Time64Microsecond(None).try_into().unwrap()
);
assert_eq!(
Value::Time(Time::new(1, TimeUnit::Nanosecond)),
ScalarValue::Time64Nanosecond(Some(1)).try_into().unwrap()
);
assert_eq!(
Value::Null,
ScalarValue::Time64Nanosecond(None).try_into().unwrap()
);
let result: Result<Value> = ScalarValue::Decimal128(Some(1), 0, 0).try_into();
assert!(result
.unwrap_err()
@@ -1137,6 +1232,22 @@ mod tests {
&ConcreteDataType::timestamp_millisecond_datatype(),
&Value::Timestamp(Timestamp::new_millisecond(1)),
);
check_type_and_value(
&ConcreteDataType::time_second_datatype(),
&Value::Time(Time::new_second(1)),
);
check_type_and_value(
&ConcreteDataType::time_millisecond_datatype(),
&Value::Time(Time::new_millisecond(1)),
);
check_type_and_value(
&ConcreteDataType::time_microsecond_datatype(),
&Value::Time(Time::new_microsecond(1)),
);
check_type_and_value(
&ConcreteDataType::time_nanosecond_datatype(),
&Value::Time(Time::new_nanosecond(1)),
);
}
#[test]
@@ -1232,6 +1343,10 @@ mod tests {
serde_json::Value::Number(1.into()),
to_json(Value::Timestamp(Timestamp::new_millisecond(1)))
);
assert_eq!(
serde_json::Value::Number(1.into()),
to_json(Value::Time(Time::new_millisecond(1)))
);
let json_value: serde_json::Value =
serde_json::from_str(r#"{"items":[{"Int32":123}],"datatype":{"Int32":{}}}"#).unwrap();
@@ -1289,6 +1404,7 @@ mod tests {
check_as_value_ref!(Float32, OrderedF32::from(16.0));
check_as_value_ref!(Float64, OrderedF64::from(16.0));
check_as_value_ref!(Timestamp, Timestamp::new_millisecond(1));
check_as_value_ref!(Time, Time::new_millisecond(1));
assert_eq!(
ValueRef::String("hello"),
@@ -1338,6 +1454,7 @@ mod tests {
check_as_correct!(true, Boolean, as_boolean);
check_as_correct!(Date::new(123), Date, as_date);
check_as_correct!(DateTime::new(12), DateTime, as_datetime);
check_as_correct!(Time::new_second(12), Time, as_time);
let list = ListValue {
items: None,
datatype: ConcreteDataType::int32_datatype(),
@@ -1351,6 +1468,8 @@ mod tests {
assert!(wrong_value.as_date().is_err());
assert!(wrong_value.as_datetime().is_err());
assert!(wrong_value.as_list().is_err());
assert!(wrong_value.as_time().is_err());
assert!(wrong_value.as_timestamp().is_err());
}
#[test]
@@ -1383,6 +1502,10 @@ mod tests {
Value::Timestamp(Timestamp::new(1000, TimeUnit::Millisecond)).to_string(),
"1970-01-01 08:00:01+0800"
);
assert_eq!(
Value::Time(Time::new(1000, TimeUnit::Millisecond)).to_string(),
"08:00:01+0800"
);
assert_eq!(
Value::List(ListValue::new(
Some(Box::new(vec![Value::Int8(1), Value::Int8(2)])),
@@ -1593,6 +1716,31 @@ mod tests {
.try_to_scalar_value(&ConcreteDataType::binary_datatype())
.unwrap()
);
assert_eq!(
ScalarValue::Time32Second(None),
Value::Null
.try_to_scalar_value(&ConcreteDataType::time_second_datatype())
.unwrap()
);
assert_eq!(
ScalarValue::Time32Millisecond(None),
Value::Null
.try_to_scalar_value(&ConcreteDataType::time_millisecond_datatype())
.unwrap()
);
assert_eq!(
ScalarValue::Time64Microsecond(None),
Value::Null
.try_to_scalar_value(&ConcreteDataType::time_microsecond_datatype())
.unwrap()
);
assert_eq!(
ScalarValue::Time64Nanosecond(None),
Value::Null
.try_to_scalar_value(&ConcreteDataType::time_nanosecond_datatype())
.unwrap()
);
}
#[test]
@@ -1638,4 +1786,24 @@ mod tests {
timestamp_to_scalar_value(TimeUnit::Nanosecond, Some(1))
);
}
#[test]
fn test_time_to_scalar_value() {
assert_eq!(
ScalarValue::Time32Second(Some(1)),
time_to_scalar_value(TimeUnit::Second, Some(1)).unwrap()
);
assert_eq!(
ScalarValue::Time32Millisecond(Some(1)),
time_to_scalar_value(TimeUnit::Millisecond, Some(1)).unwrap()
);
assert_eq!(
ScalarValue::Time64Microsecond(Some(1)),
time_to_scalar_value(TimeUnit::Microsecond, Some(1)).unwrap()
);
assert_eq!(
ScalarValue::Time64Nanosecond(Some(1)),
time_to_scalar_value(TimeUnit::Nanosecond, Some(1)).unwrap()
);
}
}

View File

@@ -37,6 +37,7 @@ mod null;
mod operations;
mod primitive;
mod string;
mod time;
mod timestamp;
mod validity;
@@ -56,6 +57,11 @@ pub use primitive::{
UInt64VectorBuilder, UInt8Vector, UInt8VectorBuilder,
};
pub use string::{StringVector, StringVectorBuilder};
pub use time::{
TimeMicrosecondVector, TimeMicrosecondVectorBuilder, TimeMillisecondVector,
TimeMillisecondVectorBuilder, TimeNanosecondVector, TimeNanosecondVectorBuilder,
TimeSecondVector, TimeSecondVectorBuilder,
};
pub use timestamp::{
TimestampMicrosecondVector, TimestampMicrosecondVectorBuilder, TimestampMillisecondVector,
TimestampMillisecondVectorBuilder, TimestampNanosecondVector, TimestampNanosecondVectorBuilder,

View File

@@ -15,11 +15,12 @@
use std::sync::Arc;
use crate::data_type::DataType;
use crate::types::TimestampType;
use crate::types::{TimeType, TimestampType};
use crate::vectors::constant::ConstantVector;
use crate::vectors::{
BinaryVector, BooleanVector, DateTimeVector, DateVector, ListVector, PrimitiveVector,
StringVector, TimestampMicrosecondVector, TimestampMillisecondVector,
StringVector, TimeMicrosecondVector, TimeMillisecondVector, TimeNanosecondVector,
TimeSecondVector, TimestampMicrosecondVector, TimestampMillisecondVector,
TimestampNanosecondVector, TimestampSecondVector, Vector,
};
use crate::with_match_primitive_type_id;
@@ -106,6 +107,21 @@ fn equal(lhs: &dyn Vector, rhs: &dyn Vector) -> bool {
unreachable!("should not compare {} with {}", lhs.vector_type_name(), rhs.vector_type_name())
})
}
Time(t) => match t {
TimeType::Second(_) => {
is_vector_eq!(TimeSecondVector, lhs, rhs)
}
TimeType::Millisecond(_) => {
is_vector_eq!(TimeMillisecondVector, lhs, rhs)
}
TimeType::Microsecond(_) => {
is_vector_eq!(TimeMicrosecondVector, lhs, rhs)
}
TimeType::Nanosecond(_) => {
is_vector_eq!(TimeNanosecondVector, lhs, rhs)
}
},
}
}
@@ -177,6 +193,11 @@ mod tests {
assert_vector_ref_eq(Arc::new(UInt64Vector::from_slice([1, 2, 3, 4])));
assert_vector_ref_eq(Arc::new(Float32Vector::from_slice([1.0, 2.0, 3.0, 4.0])));
assert_vector_ref_eq(Arc::new(Float64Vector::from_slice([1.0, 2.0, 3.0, 4.0])));
assert_vector_ref_eq(Arc::new(TimeSecondVector::from_values([100, 120])));
assert_vector_ref_eq(Arc::new(TimeMillisecondVector::from_values([100, 120])));
assert_vector_ref_eq(Arc::new(TimeMicrosecondVector::from_values([100, 120])));
assert_vector_ref_eq(Arc::new(TimeNanosecondVector::from_values([100, 120])));
}
#[test]
@@ -224,5 +245,10 @@ mod tests {
)),
);
assert_vector_ref_ne(Arc::new(NullVector::new(5)), Arc::new(NullVector::new(8)));
assert_vector_ref_ne(
Arc::new(TimeMicrosecondVector::from_values([100, 120])),
Arc::new(TimeMicrosecondVector::from_values([200, 220])),
);
}
}

View File

@@ -31,7 +31,8 @@ use crate::value::{ListValue, ListValueRef};
use crate::vectors::{
BinaryVector, BooleanVector, ConstantVector, DateTimeVector, DateVector, Float32Vector,
Float64Vector, Int16Vector, Int32Vector, Int64Vector, Int8Vector, ListVector,
ListVectorBuilder, MutableVector, NullVector, StringVector, TimestampMicrosecondVector,
ListVectorBuilder, MutableVector, NullVector, StringVector, TimeMicrosecondVector,
TimeMillisecondVector, TimeNanosecondVector, TimeSecondVector, TimestampMicrosecondVector,
TimestampMillisecondVector, TimestampNanosecondVector, TimestampSecondVector, UInt16Vector,
UInt32Vector, UInt64Vector, UInt8Vector, Vector, VectorRef,
};
@@ -194,16 +195,24 @@ impl Helper {
// Timezone is unimplemented now.
ConstantVector::new(Arc::new(TimestampNanosecondVector::from(vec![v])), length)
}
ScalarValue::Time32Second(v) => {
ConstantVector::new(Arc::new(TimeSecondVector::from(vec![v])), length)
}
ScalarValue::Time32Millisecond(v) => {
ConstantVector::new(Arc::new(TimeMillisecondVector::from(vec![v])), length)
}
ScalarValue::Time64Microsecond(v) => {
ConstantVector::new(Arc::new(TimeMicrosecondVector::from(vec![v])), length)
}
ScalarValue::Time64Nanosecond(v) => {
ConstantVector::new(Arc::new(TimeNanosecondVector::from(vec![v])), length)
}
ScalarValue::Decimal128(_, _, _)
| ScalarValue::IntervalYearMonth(_)
| ScalarValue::IntervalDayTime(_)
| ScalarValue::IntervalMonthDayNano(_)
| ScalarValue::Struct(_, _)
| ScalarValue::Dictionary(_, _)
| ScalarValue::Time32Second(_)
| ScalarValue::Time32Millisecond(_)
| ScalarValue::Time64Microsecond(_)
| ScalarValue::Time64Nanosecond(_) => {
| ScalarValue::Dictionary(_, _) => {
return error::ConversionSnafu {
from: format!("Unsupported scalar value: {value}"),
}
@@ -261,9 +270,23 @@ impl Helper {
TimestampNanosecondVector::try_from_arrow_timestamp_array(array)?,
),
},
ArrowDataType::Time32(unit) => match unit {
TimeUnit::Second => Arc::new(TimeSecondVector::try_from_arrow_time_array(array)?),
TimeUnit::Millisecond => {
Arc::new(TimeMillisecondVector::try_from_arrow_time_array(array)?)
}
_ => unimplemented!("Arrow array datatype: {:?}", array.as_ref().data_type()),
},
ArrowDataType::Time64(unit) => match unit {
TimeUnit::Microsecond => {
Arc::new(TimeMicrosecondVector::try_from_arrow_time_array(array)?)
}
TimeUnit::Nanosecond => {
Arc::new(TimeNanosecondVector::try_from_arrow_time_array(array)?)
}
_ => unimplemented!("Arrow array datatype: {:?}", array.as_ref().data_type()),
},
ArrowDataType::Float16
| ArrowDataType::Time32(_)
| ArrowDataType::Time64(_)
| ArrowDataType::Duration(_)
| ArrowDataType::Interval(_)
| ArrowDataType::LargeList(_)
@@ -301,10 +324,12 @@ mod tests {
use arrow::array::{
ArrayRef, BooleanArray, Date32Array, Date64Array, Float32Array, Float64Array, Int16Array,
Int32Array, Int64Array, Int8Array, LargeBinaryArray, ListArray, NullArray,
Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray,
TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
};
use arrow::datatypes::{Field, Int32Type};
use common_time::time::Time;
use common_time::{Date, DateTime};
use super::*;
@@ -438,5 +463,19 @@ mod tests {
check_try_into_vector(TimestampMillisecondArray::from(vec![1, 2, 3]));
check_try_into_vector(TimestampMicrosecondArray::from(vec![1, 2, 3]));
check_try_into_vector(TimestampNanosecondArray::from(vec![1, 2, 3]));
check_try_into_vector(Time32SecondArray::from(vec![1, 2, 3]));
check_try_into_vector(Time32MillisecondArray::from(vec![1, 2, 3]));
check_try_into_vector(Time64MicrosecondArray::from(vec![1, 2, 3]));
check_try_into_vector(Time64NanosecondArray::from(vec![1, 2, 3]));
}
#[test]
fn test_try_from_scalar_time_value() {
let vector = Helper::try_from_scalar_value(ScalarValue::Time32Second(Some(42)), 3).unwrap();
assert_eq!(ConcreteDataType::time_second_datatype(), vector.data_type());
assert_eq!(3, vector.len());
for i in 0..vector.len() {
assert_eq!(Value::Time(Time::new_second(42)), vector.get(i));
}
}
}

View File

@@ -18,6 +18,8 @@ use std::sync::Arc;
use arrow::array::{
Array, ArrayBuilder, ArrayData, ArrayIter, ArrayRef, PrimitiveArray, PrimitiveBuilder,
Time32MillisecondArray as TimeMillisecondArray, Time32SecondArray as TimeSecondArray,
Time64MicrosecondArray as TimeMicrosecondArray, Time64NanosecondArray as TimeNanosecondArray,
TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
TimestampSecondArray,
};
@@ -110,6 +112,44 @@ impl<T: LogicalPrimitiveType> PrimitiveVector<T> {
Ok(Self::new(concrete_array))
}
/// Converts arrow time array to vectors
pub fn try_from_arrow_time_array(array: impl AsRef<dyn Array>) -> Result<Self> {
let array = array.as_ref();
let array_data = match array.data_type() {
DataType::Time32(unit) => match unit {
arrow_schema::TimeUnit::Second => array
.as_any()
.downcast_ref::<TimeSecondArray>()
.unwrap()
.to_data(),
arrow_schema::TimeUnit::Millisecond => array
.as_any()
.downcast_ref::<TimeMillisecondArray>()
.unwrap()
.to_data(),
_ => unreachable!(),
},
DataType::Time64(unit) => match unit {
arrow_schema::TimeUnit::Microsecond => array
.as_any()
.downcast_ref::<TimeMicrosecondArray>()
.unwrap()
.to_data(),
arrow_schema::TimeUnit::Nanosecond => array
.as_any()
.downcast_ref::<TimeNanosecondArray>()
.unwrap()
.to_data(),
_ => unreachable!(),
},
_ => {
unreachable!()
}
};
let concrete_array = PrimitiveArray::<T::ArrowPrimitive>::from(array_data);
Ok(Self::new(concrete_array))
}
pub fn from_slice<P: AsRef<[T::Native]>>(slice: P) -> Self {
let iter = slice.as_ref().iter().copied();
Self {
@@ -441,7 +481,10 @@ pub(crate) fn replicate_primitive<T: LogicalPrimitiveType>(
#[cfg(test)]
mod tests {
use arrow::array::Int32Array;
use arrow::array::{
Int32Array, Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray,
Time64NanosecondArray,
};
use arrow::datatypes::DataType as ArrowDataType;
use serde_json;
@@ -449,6 +492,9 @@ mod tests {
use crate::data_type::DataType;
use crate::serialize::Serializable;
use crate::types::Int64Type;
use crate::vectors::{
TimeMicrosecondVector, TimeMillisecondVector, TimeNanosecondVector, TimeSecondVector,
};
fn check_vec(v: Int32Vector) {
assert_eq!(4, v.len());
@@ -599,4 +645,23 @@ mod tests {
test_from_wrapper_slice!(Float32Vector, f32);
test_from_wrapper_slice!(Float64Vector, f64);
}
#[test]
fn test_try_from_arrow_time_array() {
let array: ArrayRef = Arc::new(Time32SecondArray::from(vec![1i32, 2, 3]));
let vector = TimeSecondVector::try_from_arrow_time_array(array).unwrap();
assert_eq!(TimeSecondVector::from_values(vec![1, 2, 3]), vector);
let array: ArrayRef = Arc::new(Time32MillisecondArray::from(vec![1i32, 2, 3]));
let vector = TimeMillisecondVector::try_from_arrow_time_array(array).unwrap();
assert_eq!(TimeMillisecondVector::from_values(vec![1, 2, 3]), vector);
let array: ArrayRef = Arc::new(Time64MicrosecondArray::from(vec![1i64, 2, 3]));
let vector = TimeMicrosecondVector::try_from_arrow_time_array(array).unwrap();
assert_eq!(TimeMicrosecondVector::from_values(vec![1, 2, 3]), vector);
let array: ArrayRef = Arc::new(Time64NanosecondArray::from(vec![1i64, 2, 3]));
let vector = TimeNanosecondVector::try_from_arrow_time_array(array).unwrap();
assert_eq!(TimeNanosecondVector::from_values(vec![1, 2, 3]), vector);
}
}

View File

@@ -0,0 +1,28 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::types::{TimeMicrosecondType, TimeMillisecondType, TimeNanosecondType, TimeSecondType};
use crate::vectors::{PrimitiveVector, PrimitiveVectorBuilder};
pub type TimeSecondVector = PrimitiveVector<TimeSecondType>;
pub type TimeSecondVectorBuilder = PrimitiveVectorBuilder<TimeSecondType>;
pub type TimeMillisecondVector = PrimitiveVector<TimeMillisecondType>;
pub type TimeMillisecondVectorBuilder = PrimitiveVectorBuilder<TimeMillisecondType>;
pub type TimeMicrosecondVector = PrimitiveVector<TimeMicrosecondType>;
pub type TimeMicrosecondVectorBuilder = PrimitiveVectorBuilder<TimeMicrosecondType>;
pub type TimeNanosecondVector = PrimitiveVector<TimeNanosecondType>;
pub type TimeNanosecondVectorBuilder = PrimitiveVectorBuilder<TimeNanosecondType>;

View File

@@ -80,7 +80,7 @@ impl PrometheusGateway for PrometheusGatewayService {
let json_bytes = serde_json::to_string(&json_response).unwrap().into_bytes();
let response = Response::new(PromqlResponse {
header: Some(ResponseHeader {}),
header: Some(ResponseHeader { status: None }),
body: json_bytes,
});
Ok(response)

View File

@@ -153,7 +153,7 @@ pub fn convert_value(param: &ParamValue, t: &ConcreteDataType) -> Result<ScalarV
}
.fail(),
},
ValueInner::NULL => Ok(value::to_null_scalar_value(t)),
ValueInner::NULL => value::to_null_scalar_value(t).context(error::ConvertScalarValueSnafu),
ValueInner::Bytes(b) => match t {
ConcreteDataType::String(_) => Ok(ScalarValue::Utf8(Some(
String::from_utf8_lossy(b).to_string(),

View File

@@ -190,6 +190,8 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
),
})
}
Value::Time(v) => row_writer
.write_col(v.to_timezone_aware_string(query_context.time_zone()))?,
}
}
row_writer.end_row().await?;
@@ -236,6 +238,7 @@ pub(crate) fn create_mysql_column(
Ok(ColumnType::MYSQL_TYPE_VARCHAR)
}
ConcreteDataType::Timestamp(_) => Ok(ColumnType::MYSQL_TYPE_TIMESTAMP),
ConcreteDataType::Time(_) => Ok(ColumnType::MYSQL_TYPE_TIME),
ConcreteDataType::Date(_) => Ok(ColumnType::MYSQL_TYPE_DATE),
ConcreteDataType::DateTime(_) => Ok(ColumnType::MYSQL_TYPE_DATETIME),
_ => error::InternalSnafu {

View File

@@ -88,6 +88,15 @@ pub(super) fn encode_value(value: &Value, builder: &mut DataRowEncoder) -> PgWir
})))
}
}
Value::Time(v) => {
if let Some(time) = v.to_chrono_time() {
builder.encode_field(&time)
} else {
Err(PgWireError::ApiError(Box::new(Error::Internal {
err_msg: format!("Failed to convert time to postgres type {v:?}",),
})))
}
}
Value::List(_) => Err(PgWireError::ApiError(Box::new(Error::Internal {
err_msg: format!(
"cannot write value {:?} in postgres protocol: unimplemented",
@@ -112,6 +121,7 @@ pub(super) fn type_gt_to_pg(origin: &ConcreteDataType) -> Result<Type> {
&ConcreteDataType::Date(_) => Ok(Type::DATE),
&ConcreteDataType::DateTime(_) => Ok(Type::TIMESTAMP),
&ConcreteDataType::Timestamp(_) => Ok(Type::TIMESTAMP),
&ConcreteDataType::Time(_) => Ok(Type::TIME),
&ConcreteDataType::List(_) | &ConcreteDataType::Dictionary(_) => error::InternalSnafu {
err_msg: format!("not implemented for column datatype {origin:?}"),
}
@@ -516,6 +526,7 @@ mod test {
true,
),
ColumnSchema::new("dates", ConcreteDataType::date_datatype(), true),
ColumnSchema::new("times", ConcreteDataType::time_second_datatype(), true),
];
let pg_field_info = vec![
FieldInfo::new("nulls".into(), None, None, Type::UNKNOWN, FieldFormat::Text),
@@ -564,6 +575,7 @@ mod test {
FieldFormat::Text,
),
FieldInfo::new("dates".into(), None, None, Type::DATE, FieldFormat::Text),
FieldInfo::new("times".into(), None, None, Type::TIME, FieldFormat::Text),
];
let schema = Schema::new(column_schemas);
let fs = schema_to_pg(&schema, &Format::UnifiedText).unwrap();
@@ -644,6 +656,7 @@ mod test {
FieldFormat::Text,
),
FieldInfo::new("dates".into(), None, None, Type::DATE, FieldFormat::Text),
FieldInfo::new("times".into(), None, None, Type::TIME, FieldFormat::Text),
FieldInfo::new(
"datetimes".into(),
None,
@@ -684,6 +697,7 @@ mod test {
Value::String("greptime".into()),
Value::Binary("greptime".as_bytes().into()),
Value::Date(1001i32.into()),
Value::Time(1001i64.into()),
Value::DateTime(1000001i64.into()),
Value::Timestamp(1000001i64.into()),
];

View File

@@ -396,6 +396,10 @@ pub fn concrete_data_type_to_sql_data_type(data_type: &ConcreteDataType) -> Resu
Some(ts_type.precision()),
TimezoneInfo::None,
)),
ConcreteDataType::Time(time_type) => Ok(SqlDataType::Time(
Some(time_type.precision()),
TimezoneInfo::None,
)),
ConcreteDataType::Binary(_) => Ok(SqlDataType::Varbinary(None)),
ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) => {
unreachable!()

View File

@@ -0,0 +1,6 @@
-- SQLNESS REPLACE (\d+:\d+:\d+\.\d+) TIME
-- SQLNESS REPLACE [\s\-]+
select current_time();
++|current_time()|++|TIME|++

View File

@@ -0,0 +1,3 @@
-- SQLNESS REPLACE (\d+:\d+:\d+\.\d+) TIME
-- SQLNESS REPLACE [\s\-]+
select current_time();