diff --git a/src/catalog/src/system.rs b/src/catalog/src/system.rs index fa02e5f58b..50bb715fc3 100644 --- a/src/catalog/src/system.rs +++ b/src/catalog/src/system.rs @@ -222,7 +222,9 @@ pub fn build_insert_request(entry_type: EntryType, key: &[u8], value: &[u8]) -> // Timestamp in key part is intentionally left to 0 columns_values.insert( "timestamp".to_string(), - Arc::new(TimestampVector::from_slice(&[Timestamp::from_millis(0)])) as _, + Arc::new(TimestampVector::from_slice(&[Timestamp::new_millisecond( + 0, + )])) as _, ); columns_values.insert( @@ -232,14 +234,14 @@ pub fn build_insert_request(entry_type: EntryType, key: &[u8], value: &[u8]) -> columns_values.insert( "gmt_created".to_string(), - Arc::new(TimestampVector::from_slice(&[Timestamp::from_millis( + Arc::new(TimestampVector::from_slice(&[Timestamp::new_millisecond( util::current_time_millis(), )])) as _, ); columns_values.insert( "gmt_modified".to_string(), - Arc::new(TimestampVector::from_slice(&[Timestamp::from_millis( + Arc::new(TimestampVector::from_slice(&[Timestamp::new_millisecond( util::current_time_millis(), )])) as _, ); diff --git a/src/common/grpc-expr/src/insert.rs b/src/common/grpc-expr/src/insert.rs index 515ffcbe7e..f968ff9b56 100644 --- a/src/common/grpc-expr/src/insert.rs +++ b/src/common/grpc-expr/src/insert.rs @@ -22,11 +22,11 @@ use api::v1::{AddColumn, AddColumns, Column, ColumnDataType, ColumnDef, CreateEx use common_base::BitVec; use common_time::timestamp::Timestamp; use common_time::{Date, DateTime}; -use datatypes::data_type::ConcreteDataType; +use datatypes::data_type::{ConcreteDataType, DataType}; use datatypes::prelude::{ValueRef, VectorRef}; use datatypes::schema::SchemaRef; use datatypes::value::Value; -use datatypes::vectors::VectorBuilder; +use datatypes::vectors::MutableVector; use snafu::{ensure, OptionExt, ResultExt}; use table::metadata::TableId; use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest, InsertRequest}; @@ -99,7 +99,7 @@ pub fn column_to_vector(column: &Column, rows: u32) -> Result { let column_datatype = wrapper.datatype(); let rows = rows as usize; - let mut vector = VectorBuilder::with_capacity(wrapper.into(), rows); + let mut vector = ConcreteDataType::from(wrapper).create_mutable_vector(rows); if let Some(values) = &column.values { let values = collect_column_values(column_datatype, values); @@ -110,21 +110,31 @@ pub fn column_to_vector(column: &Column, rows: u32) -> Result { for i in 0..rows { if let Some(true) = nulls_iter.next() { - vector.push_null(); + vector + .push_value_ref(ValueRef::Null) + .context(CreateVectorSnafu)?; } else { - let value_ref = values_iter.next().context(InvalidColumnProtoSnafu { - err_msg: format!( - "value not found at position {} of column {}", - i, &column.column_name - ), - })?; - vector.try_push_ref(value_ref).context(CreateVectorSnafu)?; + let value_ref = values_iter + .next() + .with_context(|| InvalidColumnProtoSnafu { + err_msg: format!( + "value not found at position {} of column {}", + i, &column.column_name + ), + })?; + vector + .push_value_ref(value_ref) + .context(CreateVectorSnafu)?; } } } else { - (0..rows).for_each(|_| vector.push_null()); + (0..rows).try_for_each(|_| { + vector + .push_value_ref(ValueRef::Null) + .context(CreateVectorSnafu) + })?; } - Ok(vector.finish()) + Ok(vector.to_vector()) } fn collect_column_values(column_datatype: ColumnDataType, values: &Values) -> Vec { @@ -174,9 +184,24 @@ fn collect_column_values(column_datatype: ColumnDataType, values: &Values) -> Ve DateTime::new(*v) )) } - ColumnDataType::Timestamp => { + ColumnDataType::TimestampSecond => { + collect_values!(values.ts_second_values, |v| ValueRef::Timestamp( + Timestamp::new_second(*v) + )) + } + ColumnDataType::TimestampMillisecond => { collect_values!(values.ts_millisecond_values, |v| ValueRef::Timestamp( - Timestamp::from_millis(*v) + Timestamp::new_millisecond(*v) + )) + } + ColumnDataType::TimestampMicrosecond => { + collect_values!(values.ts_millisecond_values, |v| ValueRef::Timestamp( + Timestamp::new_microsecond(*v) + )) + } + ColumnDataType::TimestampNanosecond => { + collect_values!(values.ts_millisecond_values, |v| ValueRef::Timestamp( + Timestamp::new_nanosecond(*v) )) } } @@ -289,10 +314,7 @@ pub fn insertion_expr_to_request( }, )?; let data_type = &column_schema.data_type; - entry.insert(VectorBuilder::with_capacity( - data_type.clone(), - row_count as usize, - )) + entry.insert(data_type.create_mutable_vector(row_count as usize)) } }; add_values_to_builder(vector_builder, values, row_count as usize, null_mask)?; @@ -300,7 +322,7 @@ pub fn insertion_expr_to_request( } let columns_values = columns_builders .into_iter() - .map(|(column_name, mut vector_builder)| (column_name, vector_builder.finish())) + .map(|(column_name, mut vector_builder)| (column_name, vector_builder.to_vector())) .collect(); Ok(InsertRequest { @@ -312,7 +334,7 @@ pub fn insertion_expr_to_request( } fn add_values_to_builder( - builder: &mut VectorBuilder, + builder: &mut Box, values: Values, row_count: usize, null_mask: Vec, @@ -323,9 +345,11 @@ fn add_values_to_builder( if null_mask.is_empty() { ensure!(values.len() == row_count, IllegalInsertDataSnafu); - values.iter().for_each(|value| { - builder.push(value); - }); + values.iter().try_for_each(|value| { + builder + .push_value_ref(value.as_value_ref()) + .context(CreateVectorSnafu) + })?; } else { let null_mask = BitVec::from_vec(null_mask); ensure!( @@ -336,9 +360,13 @@ fn add_values_to_builder( let mut idx_of_values = 0; for idx in 0..row_count { match is_null(&null_mask, idx) { - Some(true) => builder.push(&Value::Null), + Some(true) => builder + .push_value_ref(ValueRef::Null) + .context(CreateVectorSnafu)?, _ => { - builder.push(&values[idx_of_values]); + builder + .push_value_ref(values[idx_of_values].as_value_ref()) + .context(CreateVectorSnafu)?; idx_of_values += 1 } } @@ -420,7 +448,7 @@ fn convert_values(data_type: &ConcreteDataType, values: Values) -> Vec { ConcreteDataType::Timestamp(_) => values .ts_millisecond_values .into_iter() - .map(|v| Value::Timestamp(Timestamp::from_millis(v))) + .map(|v| Value::Timestamp(Timestamp::new_millisecond(v))) .collect(), ConcreteDataType::Null(_) => unreachable!(), ConcreteDataType::List(_) => unreachable!(), @@ -624,8 +652,8 @@ mod tests { assert_eq!(Value::Float64(0.1.into()), memory.get(1)); let ts = insert_req.columns_values.get("ts").unwrap(); - assert_eq!(Value::Timestamp(Timestamp::from_millis(100)), ts.get(0)); - assert_eq!(Value::Timestamp(Timestamp::from_millis(101)), ts.get(1)); + assert_eq!(Value::Timestamp(Timestamp::new_millisecond(100)), ts.get(0)); + assert_eq!(Value::Timestamp(Timestamp::new_millisecond(101)), ts.get(1)); } #[test] @@ -753,7 +781,7 @@ mod tests { semantic_type: TIMESTAMP_SEMANTIC_TYPE, values: Some(ts_vals), null_mask: vec![0], - datatype: ColumnDataType::Timestamp as i32, + datatype: ColumnDataType::TimestampMillisecond as i32, }; ( diff --git a/src/common/time/src/date.rs b/src/common/time/src/date.rs index 1a0db47cb3..b12eb9f50d 100644 --- a/src/common/time/src/date.rs +++ b/src/common/time/src/date.rs @@ -98,7 +98,7 @@ mod tests { Date::from_str("1969-01-01").unwrap().to_string() ); - let now = Utc::now().date().format("%F").to_string(); + let now = Utc::now().date_naive().format("%F").to_string(); assert_eq!(now, Date::from_str(&now).unwrap().to_string()); } diff --git a/src/common/time/src/timestamp.rs b/src/common/time/src/timestamp.rs index 991b07e2ea..b3de23d01d 100644 --- a/src/common/time/src/timestamp.rs +++ b/src/common/time/src/timestamp.rs @@ -35,13 +35,34 @@ impl Timestamp { Self { unit, value } } - pub fn from_millis(value: i64) -> Self { + pub fn new_second(value: i64) -> Self { + Self { + value, + unit: TimeUnit::Second, + } + } + + pub fn new_millisecond(value: i64) -> Self { Self { value, unit: TimeUnit::Millisecond, } } + pub fn new_microsecond(value: i64) -> Self { + Self { + value, + unit: TimeUnit::Microsecond, + } + } + + pub fn new_nanosecond(value: i64) -> Self { + Self { + value, + unit: TimeUnit::Nanosecond, + } + } + pub fn unit(&self) -> TimeUnit { self.unit } @@ -274,10 +295,11 @@ mod tests { // but expected timestamp is in UTC timezone fn check_from_str(s: &str, expect: &str) { let ts = Timestamp::from_str(s).unwrap(); - let time = NaiveDateTime::from_timestamp( + let time = NaiveDateTime::from_timestamp_opt( ts.value / 1_000_000_000, (ts.value % 1_000_000_000) as u32, - ); + ) + .unwrap(); assert_eq!(expect, time.to_string()); } @@ -290,7 +312,13 @@ mod tests { check_from_str( "2020-09-08 13:42:29", &NaiveDateTime::from_timestamp_opt( - 1599572549 - Local.timestamp(0, 0).offset().fix().local_minus_utc() as i64, + 1599572549 + - Local + .timestamp_opt(0, 0) + .unwrap() + .offset() + .fix() + .local_minus_utc() as i64, 0, ) .unwrap() @@ -300,7 +328,13 @@ mod tests { check_from_str( "2020-09-08T13:42:29", &NaiveDateTime::from_timestamp_opt( - 1599572549 - Local.timestamp(0, 0).offset().fix().local_minus_utc() as i64, + 1599572549 + - Local + .timestamp_opt(0, 0) + .unwrap() + .offset() + .fix() + .local_minus_utc() as i64, 0, ) .unwrap() @@ -310,7 +344,13 @@ mod tests { check_from_str( "2020-09-08 13:42:29.042", &NaiveDateTime::from_timestamp_opt( - 1599572549 - Local.timestamp(0, 0).offset().fix().local_minus_utc() as i64, + 1599572549 + - Local + .timestamp_opt(0, 0) + .unwrap() + .offset() + .fix() + .local_minus_utc() as i64, 42000000, ) .unwrap() @@ -321,7 +361,13 @@ mod tests { check_from_str( "2020-09-08T13:42:29.042", &NaiveDateTime::from_timestamp_opt( - 1599572549 - Local.timestamp(0, 0).offset().fix().local_minus_utc() as i64, + 1599572549 + - Local + .timestamp_opt(0, 0) + .unwrap() + .offset() + .fix() + .local_minus_utc() as i64, 42000000, ) .unwrap() @@ -341,19 +387,19 @@ mod tests { assert_eq!(datetime_str, ts.to_iso8601_string()); let ts_millis = 1668070237000; - let ts = Timestamp::from_millis(ts_millis); + let ts = Timestamp::new_millisecond(ts_millis); assert_eq!("2022-11-10 08:50:37+0000", ts.to_iso8601_string()); let ts_millis = -1000; - let ts = Timestamp::from_millis(ts_millis); + let ts = Timestamp::new_millisecond(ts_millis); assert_eq!("1969-12-31 23:59:59+0000", ts.to_iso8601_string()); let ts_millis = -1; - let ts = Timestamp::from_millis(ts_millis); + let ts = Timestamp::new_millisecond(ts_millis); assert_eq!("1969-12-31 23:59:59.999+0000", ts.to_iso8601_string()); let ts_millis = -1001; - let ts = Timestamp::from_millis(ts_millis); + let ts = Timestamp::new_millisecond(ts_millis); assert_eq!("1969-12-31 23:59:58.999+0000", ts.to_iso8601_string()); } diff --git a/src/common/time/src/util.rs b/src/common/time/src/util.rs index 3d3baebc2e..1917ce3456 100644 --- a/src/common/time/src/util.rs +++ b/src/common/time/src/util.rs @@ -33,8 +33,8 @@ mod tests { .duration_since(time::UNIX_EPOCH) .unwrap() .as_millis() as i64; - let datetime_now = chrono::Utc.timestamp_millis(now); - let datetime_std = chrono::Utc.timestamp_millis(millis_from_std); + let datetime_now = chrono::Utc.timestamp_millis_opt(now).unwrap(); + let datetime_std = chrono::Utc.timestamp_millis_opt(millis_from_std).unwrap(); assert_eq!(datetime_std.year(), datetime_now.year()); assert_eq!(datetime_std.month(), datetime_now.month()); diff --git a/src/datanode/src/sql.rs b/src/datanode/src/sql.rs index cef3d38b49..e578bec1e9 100644 --- a/src/datanode/src/sql.rs +++ b/src/datanode/src/sql.rs @@ -288,11 +288,11 @@ mod tests { let ts = &columns_values["ts"]; assert_eq!(2, ts.len()); assert_eq!( - Value::from(Timestamp::from_millis(1655276557000i64)), + Value::from(Timestamp::new_millisecond(1655276557000i64)), ts.get(0) ); assert_eq!( - Value::from(Timestamp::from_millis(1655276558000i64)), + Value::from(Timestamp::new_millisecond(1655276558000i64)), ts.get(1) ); } diff --git a/src/datatypes/src/value.rs b/src/datatypes/src/value.rs index 257cad2d87..d7cf5325bd 100644 --- a/src/datatypes/src/value.rs +++ b/src/datatypes/src/value.rs @@ -1085,7 +1085,7 @@ mod tests { ); check_type_and_value( &ConcreteDataType::timestamp_millisecond_datatype(), - &Value::Timestamp(Timestamp::from_millis(1)), + &Value::Timestamp(Timestamp::new_millisecond(1)), ); } @@ -1180,7 +1180,7 @@ mod tests { assert_eq!( serde_json::Value::Number(1.into()), - to_json(Value::Timestamp(Timestamp::from_millis(1))) + to_json(Value::Timestamp(Timestamp::new_millisecond(1))) ); let json_value: serde_json::Value = @@ -1238,7 +1238,7 @@ mod tests { check_as_value_ref!(Int64, -12); check_as_value_ref!(Float32, OrderedF32::from(16.0)); check_as_value_ref!(Float64, OrderedF64::from(16.0)); - check_as_value_ref!(Timestamp, Timestamp::from_millis(1)); + check_as_value_ref!(Timestamp, Timestamp::new_millisecond(1)); assert_eq!( ValueRef::String("hello"), diff --git a/src/script/src/python/vector.rs b/src/script/src/python/vector.rs index 951ad2f953..91bfd1a6ed 100644 --- a/src/script/src/python/vector.rs +++ b/src/script/src/python/vector.rs @@ -875,7 +875,7 @@ pub fn pyobj_try_to_typed_val( // FIXME(dennis): we always consider the timestamp unit is millis, it's not correct if user define timestamp column with other units. obj.try_into_value::(vm) .ok() - .map(Timestamp::from_millis) + .map(Timestamp::new_millisecond) .map(value::Value::Timestamp) } _ => unreachable!(), diff --git a/src/script/src/table.rs b/src/script/src/table.rs index 1504d737ae..e4e40ec916 100644 --- a/src/script/src/table.rs +++ b/src/script/src/table.rs @@ -104,17 +104,19 @@ impl ScriptsTable { // Timestamp in key part is intentionally left to 0 columns_values.insert( "timestamp".to_string(), - Arc::new(TimestampVector::from_slice(&[Timestamp::from_millis(0)])) as _, + Arc::new(TimestampVector::from_slice(&[Timestamp::new_millisecond( + 0, + )])) as _, ); columns_values.insert( "gmt_created".to_string(), - Arc::new(TimestampVector::from_slice(&[Timestamp::from_millis( + Arc::new(TimestampVector::from_slice(&[Timestamp::new_millisecond( util::current_time_millis(), )])) as _, ); columns_values.insert( "gmt_modified".to_string(), - Arc::new(TimestampVector::from_slice(&[Timestamp::from_millis( + Arc::new(TimestampVector::from_slice(&[Timestamp::new_millisecond( util::current_time_millis(), )])) as _, ); diff --git a/src/servers/src/line_writer.rs b/src/servers/src/line_writer.rs index f4fc7bf8f3..211e720399 100644 --- a/src/servers/src/line_writer.rs +++ b/src/servers/src/line_writer.rs @@ -208,9 +208,9 @@ mod tests { let cpu = columns.get("cpu").unwrap(); let expected: Vec = vec![ - Value::Timestamp(Timestamp::from_millis(1665893727685_i64)), - Value::Timestamp(Timestamp::from_millis(1665893727686_i64)), - Value::Timestamp(Timestamp::from_millis(1665893727689_i64)), + Value::Timestamp(Timestamp::new_millisecond(1665893727685_i64)), + Value::Timestamp(Timestamp::new_millisecond(1665893727686_i64)), + Value::Timestamp(Timestamp::new_millisecond(1665893727689_i64)), ]; assert_vector(&expected, ts); diff --git a/src/storage/benches/memtable/mod.rs b/src/storage/benches/memtable/mod.rs index 462c3edc28..d3335f8e5e 100644 --- a/src/storage/benches/memtable/mod.rs +++ b/src/storage/benches/memtable/mod.rs @@ -73,7 +73,7 @@ fn kvs_with_index( UInt64VectorBuilder::with_capacity(keys.len()), ); for key in keys { - key_builders.0.push(Some(Timestamp::from_millis(key.0))); + key_builders.0.push(Some(Timestamp::new_millisecond(key.0))); key_builders.1.push(Some(key.1)); } let row_keys = vec![ diff --git a/src/storage/src/memtable/inserter.rs b/src/storage/src/memtable/inserter.rs index 6f0ea70b0f..b119cb7cea 100644 --- a/src/storage/src/memtable/inserter.rs +++ b/src/storage/src/memtable/inserter.rs @@ -195,7 +195,10 @@ mod tests { for i in 0..row_num { let ts = batch.column(0).get(i); let v = batch.column(1).get(i); - assert_eq!(Value::Timestamp(Timestamp::from_millis(data[index].0)), ts); + assert_eq!( + Value::Timestamp(Timestamp::new_millisecond(data[index].0)), + ts + ); assert_eq!(Value::from(data[index].1), v); assert_eq!(Value::from(sequence), batch.column(2).get(i));