diff --git a/README.md b/README.md index ee5a97cfb4..fea61bd1cc 100644 --- a/README.md +++ b/README.md @@ -121,7 +121,7 @@ cargo run -- --log-dir=logs --log-level=debug frontend start -c ./config/fronten ```SQL CREATE TABLE monitor ( host STRING, - ts BIGINT, + ts TIMESTAMP, cpu DOUBLE DEFAULT 0, memory DOUBLE, TIME INDEX (ts), diff --git a/src/catalog/src/system.rs b/src/catalog/src/system.rs index a8d9b2dd82..0e5db53d0e 100644 --- a/src/catalog/src/system.rs +++ b/src/catalog/src/system.rs @@ -9,7 +9,7 @@ use common_time::timestamp::Timestamp; use common_time::util; use datatypes::prelude::{ConcreteDataType, ScalarVector}; use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder, SchemaRef}; -use datatypes::vectors::{BinaryVector, Int64Vector, TimestampVector, UInt8Vector}; +use datatypes::vectors::{BinaryVector, TimestampVector, UInt8Vector}; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; use table::engine::{EngineContext, TableEngineRef}; @@ -140,12 +140,12 @@ fn build_system_catalog_schema() -> Schema { ), ColumnSchema::new( "gmt_created".to_string(), - ConcreteDataType::int64_datatype(), + ConcreteDataType::timestamp_millis_datatype(), false, ), ColumnSchema::new( "gmt_modified".to_string(), - ConcreteDataType::int64_datatype(), + ConcreteDataType::timestamp_millis_datatype(), false, ), ]; @@ -186,12 +186,16 @@ pub fn build_table_insert_request(full_table_name: String, table_id: TableId) -> columns_values.insert( "gmt_created".to_string(), - Arc::new(Int64Vector::from_slice(&[util::current_time_millis()])) as _, + Arc::new(TimestampVector::from_slice(&[Timestamp::from_millis( + util::current_time_millis(), + )])) as _, ); columns_values.insert( "gmt_modified".to_string(), - Arc::new(Int64Vector::from_slice(&[util::current_time_millis()])) as _, + Arc::new(TimestampVector::from_slice(&[Timestamp::from_millis( + util::current_time_millis(), + )])) as _, ); InsertRequest { diff --git a/src/datanode/src/server/grpc/ddl.rs b/src/datanode/src/server/grpc/ddl.rs index 498c446b96..5e6b275fe9 100644 --- a/src/datanode/src/server/grpc/ddl.rs +++ b/src/datanode/src/server/grpc/ddl.rs @@ -234,7 +234,7 @@ mod tests { }, ColumnDef { name: "ts".to_string(), - data_type: 4, // int64 + data_type: 15, // timestamp is_nullable: false, }, ColumnDef { @@ -270,7 +270,7 @@ mod tests { }, ColumnSchema { name: "ts".to_string(), - data_type: ConcreteDataType::int64_datatype(), + data_type: ConcreteDataType::timestamp_millis_datatype(), is_nullable: false, }, ColumnSchema { diff --git a/src/datanode/src/server/grpc/insert.rs b/src/datanode/src/server/grpc/insert.rs index e819ac37c7..2aab556ff8 100644 --- a/src/datanode/src/server/grpc/insert.rs +++ b/src/datanode/src/server/grpc/insert.rs @@ -207,6 +207,7 @@ mod tests { use common_base::BitVec; use common_query::prelude::Expr; use common_recordbatch::SendableRecordBatchStream; + use common_time::timestamp::Timestamp; use datatypes::{ data_type::ConcreteDataType, schema::{ColumnSchema, SchemaBuilder, SchemaRef}, @@ -241,8 +242,8 @@ mod tests { assert_eq!(Value::Float64(0.1.into()), memory.get(1)); let ts = insert_req.columns_values.get("ts").unwrap(); - assert_eq!(Value::Int64(100), ts.get(0)); - assert_eq!(Value::Int64(101), ts.get(1)); + assert_eq!(Value::Timestamp(Timestamp::from_millis(100)), ts.get(0)); + assert_eq!(Value::Timestamp(Timestamp::from_millis(101)), ts.get(1)); } #[test] @@ -292,7 +293,7 @@ mod tests { ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true), ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true), - ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), true), + ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), true), ]; Arc::new( @@ -314,7 +315,7 @@ mod tests { fn mock_insert_batches() -> Vec> { const SEMANTIC_TAG: i32 = 0; - const SEMANTIC_FEILD: i32 = 1; + const SEMANTIC_FIELD: i32 = 1; const SEMANTIC_TS: i32 = 2; let row_count = 2; @@ -337,7 +338,7 @@ mod tests { }; let cpu_column = Column { column_name: "cpu".to_string(), - semantic_type: SEMANTIC_FEILD, + semantic_type: SEMANTIC_FIELD, values: Some(cpu_vals), null_mask: vec![2], ..Default::default() @@ -349,14 +350,14 @@ mod tests { }; let mem_column = Column { column_name: "memory".to_string(), - semantic_type: SEMANTIC_FEILD, + semantic_type: SEMANTIC_FIELD, values: Some(mem_vals), null_mask: vec![1], ..Default::default() }; let ts_vals = column::Values { - i64_values: vec![100, 101], + ts_millis_values: vec![100, 101], ..Default::default() }; let ts_column = Column { @@ -364,7 +365,7 @@ mod tests { semantic_type: SEMANTIC_TS, values: Some(ts_vals), null_mask: vec![0], - ..Default::default() + datatype: 15, }; let insert_batch = InsertBatch { diff --git a/src/datanode/src/sql.rs b/src/datanode/src/sql.rs index efd2db52a7..7d99adb380 100644 --- a/src/datanode/src/sql.rs +++ b/src/datanode/src/sql.rs @@ -62,6 +62,7 @@ mod tests { use catalog::SchemaProvider; use common_query::logical_plan::Expr; use common_recordbatch::SendableRecordBatchStream; + use common_time::timestamp::Timestamp; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef}; use datatypes::value::Value; @@ -92,7 +93,7 @@ mod tests { ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true), ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true), - ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), true), + ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), true), ]; Arc::new( @@ -208,8 +209,14 @@ mod tests { let ts = &columns_values["ts"]; assert_eq!(2, ts.len()); - assert_eq!(Value::from(1655276557000i64), ts.get(0)); - assert_eq!(Value::from(1655276558000i64), ts.get(1)); + assert_eq!( + Value::from(Timestamp::from_millis(1655276557000i64)), + ts.get(0) + ); + assert_eq!( + Value::from(Timestamp::from_millis(1655276558000i64)), + ts.get(1) + ); } _ => { panic!("Not supposed to reach here") diff --git a/src/datanode/src/sql/create.rs b/src/datanode/src/sql/create.rs index 6fc88ad26e..ed279d6fbe 100644 --- a/src/datanode/src/sql/create.rs +++ b/src/datanode/src/sql/create.rs @@ -183,7 +183,7 @@ mod tests { #[tokio::test] pub async fn test_create_to_request() { let handler = create_mock_sql_handler().await; - let parsed_stmt = sql_to_statement("create table demo_table( host string, ts bigint, cpu double default 0, memory double, TIME INDEX (ts), PRIMARY KEY(ts, host)) engine=mito with(regions=1);"); + let parsed_stmt = sql_to_statement("create table demo_table( host string, ts timestamp, cpu double default 0, memory double, TIME INDEX (ts), PRIMARY KEY(ts, host)) engine=mito with(regions=1);"); let c = handler.create_to_request(42, parsed_stmt).unwrap(); assert_eq!("demo_table", c.table_name); assert_eq!(42, c.id); @@ -209,7 +209,7 @@ mod tests { pub async fn test_primary_key_not_specified() { let handler = create_mock_sql_handler().await; - let parsed_stmt = sql_to_statement("create table demo_table( host string, ts bigint, cpu double default 0, memory double, TIME INDEX (ts)) engine=mito with(regions=1);"); + let parsed_stmt = sql_to_statement("create table demo_table( host string, ts timestamp, cpu double default 0, memory double, TIME INDEX (ts)) engine=mito with(regions=1);"); let c = handler.create_to_request(42, parsed_stmt).unwrap(); assert_eq!(1, c.primary_key_indices.len()); @@ -237,7 +237,7 @@ mod tests { let create_table = sql_to_statement( r"create table c.s.demo( host string, - ts bigint, + ts timestamp, cpu double default 0, memory double, TIME INDEX (ts), @@ -267,7 +267,7 @@ mod tests { .data_type ); assert_eq!( - ConcreteDataType::int64_datatype(), + ConcreteDataType::timestamp_millis_datatype(), request .schema .column_schema_by_name("ts") diff --git a/src/datanode/src/tests/instance_test.rs b/src/datanode/src/tests/instance_test.rs index 0715f17fff..11cb2fa171 100644 --- a/src/datanode/src/tests/instance_test.rs +++ b/src/datanode/src/tests/instance_test.rs @@ -4,6 +4,7 @@ use datafusion::arrow_print; use datafusion_common::record_batch::RecordBatch as DfRecordBatch; use query::Output; +use crate::error; use crate::instance::Instance; use crate::tests::test_util; @@ -67,7 +68,7 @@ pub async fn test_execute_create() { .execute_sql( r#"create table test_table( host string, - ts bigint, + ts timestamp, cpu double default 0, memory double, TIME INDEX (ts), @@ -79,6 +80,29 @@ pub async fn test_execute_create() { assert!(matches!(output, Output::AffectedRows(1))); } +#[tokio::test] +pub async fn test_create_table_illegal_timestamp_type() { + common_telemetry::init_default_ut_logging(); + + let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts(); + let instance = Instance::new(&opts).await.unwrap(); + instance.start().await.unwrap(); + + let output = instance + .execute_sql( + r#"create table test_table( + host string, + ts bigint, + cpu double default 0, + memory double, + TIME INDEX (ts), + PRIMARY KEY(ts, host) + ) engine=mito with(regions=1);"#, + ) + .await; + assert!(matches!(output, Err(error::Error::CreateSchema { .. }))); +} + #[tokio::test] async fn test_alter_table() { common_telemetry::init_default_ut_logging(); diff --git a/src/datatypes/src/data_type.rs b/src/datatypes/src/data_type.rs index 3017362023..2493072bbf 100644 --- a/src/datatypes/src/data_type.rs +++ b/src/datatypes/src/data_type.rs @@ -87,10 +87,7 @@ impl ConcreteDataType { } pub fn is_timestamp(&self) -> bool { - matches!( - self, - ConcreteDataType::Int64(_) | ConcreteDataType::Timestamp(_) - ) + matches!(self, ConcreteDataType::Timestamp(_)) } pub fn numerics() -> Vec { @@ -340,4 +337,17 @@ mod tests { ConcreteDataType::from_arrow_time_unit(&arrow::datatypes::TimeUnit::Second) ); } + + #[test] + pub fn test_is_timestamp() { + assert!(ConcreteDataType::timestamp_millis_datatype().is_timestamp()); + assert!(ConcreteDataType::timestamp_datatype(TimeUnit::Second).is_timestamp()); + assert!(ConcreteDataType::timestamp_datatype(TimeUnit::Millisecond).is_timestamp()); + assert!(ConcreteDataType::timestamp_datatype(TimeUnit::Microsecond).is_timestamp()); + assert!(ConcreteDataType::timestamp_datatype(TimeUnit::Nanosecond).is_timestamp()); + + // since timestamp data type is implemented, int64 is no longer allowed + // to be used a data type for timestamp column + assert!(!ConcreteDataType::int64_datatype().is_timestamp()); + } } diff --git a/src/datatypes/src/schema.rs b/src/datatypes/src/schema.rs index 4179940ab2..e588200e62 100644 --- a/src/datatypes/src/schema.rs +++ b/src/datatypes/src/schema.rs @@ -382,7 +382,7 @@ mod tests { fn test_schema_with_timestamp() { let column_schemas = vec![ ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true), - ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false), + ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), false), ]; let schema = SchemaBuilder::from(column_schemas.clone()) .timestamp_index(1) diff --git a/src/storage/src/memtable/inserter.rs b/src/storage/src/memtable/inserter.rs index f98596a00d..67f837fcbe 100644 --- a/src/storage/src/memtable/inserter.rs +++ b/src/storage/src/memtable/inserter.rs @@ -5,7 +5,7 @@ use std::time::Duration; use common_time::{RangeMillis, TimestampMillis}; use datatypes::prelude::ScalarVector; use datatypes::schema::SchemaRef; -use datatypes::vectors::{Int64Vector, NullVector, VectorRef}; +use datatypes::vectors::{NullVector, TimestampVector, VectorRef}; use snafu::{ensure, OptionExt}; use store_api::storage::{ColumnDescriptor, OpType, SequenceNumber}; @@ -210,7 +210,7 @@ struct SliceIndex { /// Panics if the duration is too large to be represented by i64, or `timestamps` are not all /// included by `time_range_indexes`. fn compute_slice_indexes( - timestamps: &Int64Vector, + timestamps: &TimestampVector, duration: Duration, time_range_indexes: &RangeIndexMap, ) -> Vec { @@ -228,7 +228,7 @@ fn compute_slice_indexes( for (i, ts) in timestamps.iter_data().enumerate() { // Find index for time range of the timestamp. let current_range_index = ts - .and_then(|v| TimestampMillis::new(v).align_by_bucket(duration_ms)) + .and_then(|v| TimestampMillis::new(v.value()).align_by_bucket(duration_ms)) .and_then(|aligned| time_range_indexes.get(&aligned).copied()); match current_range_index { @@ -289,6 +289,9 @@ fn compute_slice_indexes( #[cfg(test)] mod tests { + use common_time::timestamp::Timestamp; + use datatypes::prelude::ScalarVectorBuilder; + use datatypes::vectors::{Int64Vector, TimestampVector, TimestampVectorBuilder}; use datatypes::{type_id::LogicalTypeId, value::Value}; use store_api::storage::{PutOperation, WriteRequest}; @@ -318,12 +321,18 @@ mod tests { ) { assert!(duration > 0); - let timestamps = Int64Vector::from_iter(timestamps.iter()); + let mut builder = TimestampVectorBuilder::with_capacity(0); + for v in timestamps { + builder.push(v.map(common_time::timestamp::Timestamp::from_millis)); + } + + let ts_vec = builder.finish(); + let time_ranges = new_time_ranges(range_starts, duration); let time_range_indexes = new_range_index_map(&time_ranges); let slice_indexes = compute_slice_indexes( - ×tamps, + &ts_vec, Duration::from_millis(duration as u64), &time_range_indexes, ); @@ -536,7 +545,7 @@ mod tests { ); check_compute_slice_indexes( - &[Some(i64::MIN), Some(99), Some(15)], + &[Some(-1), Some(99), Some(15)], &[0], 100, &[SliceIndex { @@ -550,7 +559,7 @@ mod tests { fn new_test_write_batch() -> WriteBatch { write_batch_util::new_write_batch( &[ - ("ts", LogicalTypeId::Int64, false), + ("ts", LogicalTypeId::Timestamp, false), ("value", LogicalTypeId::Int64, true), ], Some(0), @@ -559,7 +568,7 @@ mod tests { fn new_region_schema() -> RegionSchemaRef { let desc = RegionDescBuilder::new("test") - .timestamp(("ts", LogicalTypeId::Int64, false)) + .timestamp(("ts", LogicalTypeId::Timestamp, false)) .push_value_column(("value", LogicalTypeId::Int64, true)) .enable_version_column(false) .build(); @@ -570,7 +579,7 @@ mod tests { fn put_batch(batch: &mut WriteBatch, data: &[(i64, Option)]) { let mut put_data = PutData::with_num_columns(2); - let ts = Int64Vector::from_values(data.iter().map(|v| v.0)); + let ts = TimestampVector::from_values(data.iter().map(|v| v.0)); put_data.add_key_column("ts", Arc::new(ts)).unwrap(); let value = Int64Vector::from_iter(data.iter().map(|v| v.1)); put_data.add_value_column("value", Arc::new(value)).unwrap(); @@ -602,7 +611,7 @@ mod tests { for i in 0..row_num { let ts = batch.column(0).get(i); let v = batch.column(1).get(i); - assert_eq!(Value::from(data[index].0), ts); + assert_eq!(Value::Timestamp(Timestamp::from_millis(data[index].0)), ts); assert_eq!(Value::from(data[index].1), v); assert_eq!(Value::from(sequence), batch.column(2).get(i)); diff --git a/src/storage/src/metadata.rs b/src/storage/src/metadata.rs index 43072e5f1c..29571a9051 100644 --- a/src/storage/src/metadata.rs +++ b/src/storage/src/metadata.rs @@ -554,7 +554,7 @@ mod tests { fn test_descriptor_to_region_metadata() { let region_name = "region-0"; let desc = RegionDescBuilder::new(region_name) - .timestamp(("ts", LogicalTypeId::Int64, false)) + .timestamp(("ts", LogicalTypeId::Timestamp, false)) .enable_version_column(false) .push_key_column(("k1", LogicalTypeId::Int32, false)) .push_value_column(("v1", LogicalTypeId::Float32, true)) @@ -563,7 +563,7 @@ mod tests { let expect_schema = schema_util::new_schema_ref( &[ ("k1", LogicalTypeId::Int32, false), - ("ts", LogicalTypeId::Int64, false), + ("ts", LogicalTypeId::Timestamp, false), ("v1", LogicalTypeId::Float32, true), ], Some(1), @@ -663,10 +663,11 @@ mod tests { } fn new_metadata(enable_version_column: bool) -> RegionMetadata { - let timestamp = ColumnDescriptorBuilder::new(2, "ts", ConcreteDataType::int64_datatype()) - .is_nullable(false) - .build() - .unwrap(); + let timestamp = + ColumnDescriptorBuilder::new(2, "ts", ConcreteDataType::timestamp_millis_datatype()) + .is_nullable(false) + .build() + .unwrap(); let row_key = RowKeyDescriptorBuilder::new(timestamp) .push_column( ColumnDescriptorBuilder::new(3, "k1", ConcreteDataType::int64_datatype()) @@ -703,7 +704,7 @@ mod tests { let expect_schema = schema_util::new_schema_ref( &[ ("k1", LogicalTypeId::Int64, false), - ("ts", LogicalTypeId::Int64, false), + ("ts", LogicalTypeId::Timestamp, false), ("v1", LogicalTypeId::Int64, true), ], Some(1), @@ -750,7 +751,7 @@ mod tests { let expect_schema = schema_util::new_schema_ref( &[ ("k1", LogicalTypeId::Int64, false), - ("ts", LogicalTypeId::Int64, false), + ("ts", LogicalTypeId::Timestamp, false), (consts::VERSION_COLUMN_NAME, LogicalTypeId::UInt64, false), ("v1", LogicalTypeId::Int64, true), ],