fix: forbid use int64 as timestamp column data type (#248)

* fix: forbid use int64 as timestamp column data type

* fix unit test

* fix unit tests

* change gmt_created and gmt_modified data type in system tables to timestamp

* also change data type in readme
This commit is contained in:
Lei, Huang
2022-09-14 12:03:16 +08:00
committed by GitHub
parent 20dcaa6897
commit 2dbaad9770
11 changed files with 103 additions and 47 deletions

View File

@@ -121,7 +121,7 @@ cargo run -- --log-dir=logs --log-level=debug frontend start -c ./config/fronten
```SQL
CREATE TABLE monitor (
host STRING,
ts BIGINT,
ts TIMESTAMP,
cpu DOUBLE DEFAULT 0,
memory DOUBLE,
TIME INDEX (ts),

View File

@@ -9,7 +9,7 @@ use common_time::timestamp::Timestamp;
use common_time::util;
use datatypes::prelude::{ConcreteDataType, ScalarVector};
use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder, SchemaRef};
use datatypes::vectors::{BinaryVector, Int64Vector, TimestampVector, UInt8Vector};
use datatypes::vectors::{BinaryVector, TimestampVector, UInt8Vector};
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use table::engine::{EngineContext, TableEngineRef};
@@ -140,12 +140,12 @@ fn build_system_catalog_schema() -> Schema {
),
ColumnSchema::new(
"gmt_created".to_string(),
ConcreteDataType::int64_datatype(),
ConcreteDataType::timestamp_millis_datatype(),
false,
),
ColumnSchema::new(
"gmt_modified".to_string(),
ConcreteDataType::int64_datatype(),
ConcreteDataType::timestamp_millis_datatype(),
false,
),
];
@@ -186,12 +186,16 @@ pub fn build_table_insert_request(full_table_name: String, table_id: TableId) ->
columns_values.insert(
"gmt_created".to_string(),
Arc::new(Int64Vector::from_slice(&[util::current_time_millis()])) as _,
Arc::new(TimestampVector::from_slice(&[Timestamp::from_millis(
util::current_time_millis(),
)])) as _,
);
columns_values.insert(
"gmt_modified".to_string(),
Arc::new(Int64Vector::from_slice(&[util::current_time_millis()])) as _,
Arc::new(TimestampVector::from_slice(&[Timestamp::from_millis(
util::current_time_millis(),
)])) as _,
);
InsertRequest {

View File

@@ -234,7 +234,7 @@ mod tests {
},
ColumnDef {
name: "ts".to_string(),
data_type: 4, // int64
data_type: 15, // timestamp
is_nullable: false,
},
ColumnDef {
@@ -270,7 +270,7 @@ mod tests {
},
ColumnSchema {
name: "ts".to_string(),
data_type: ConcreteDataType::int64_datatype(),
data_type: ConcreteDataType::timestamp_millis_datatype(),
is_nullable: false,
},
ColumnSchema {

View File

@@ -207,6 +207,7 @@ mod tests {
use common_base::BitVec;
use common_query::prelude::Expr;
use common_recordbatch::SendableRecordBatchStream;
use common_time::timestamp::Timestamp;
use datatypes::{
data_type::ConcreteDataType,
schema::{ColumnSchema, SchemaBuilder, SchemaRef},
@@ -241,8 +242,8 @@ mod tests {
assert_eq!(Value::Float64(0.1.into()), memory.get(1));
let ts = insert_req.columns_values.get("ts").unwrap();
assert_eq!(Value::Int64(100), ts.get(0));
assert_eq!(Value::Int64(101), ts.get(1));
assert_eq!(Value::Timestamp(Timestamp::from_millis(100)), ts.get(0));
assert_eq!(Value::Timestamp(Timestamp::from_millis(101)), ts.get(1));
}
#[test]
@@ -292,7 +293,7 @@ mod tests {
ColumnSchema::new("host", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true),
ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true),
ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), true),
ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), true),
];
Arc::new(
@@ -314,7 +315,7 @@ mod tests {
fn mock_insert_batches() -> Vec<Vec<u8>> {
const SEMANTIC_TAG: i32 = 0;
const SEMANTIC_FEILD: i32 = 1;
const SEMANTIC_FIELD: i32 = 1;
const SEMANTIC_TS: i32 = 2;
let row_count = 2;
@@ -337,7 +338,7 @@ mod tests {
};
let cpu_column = Column {
column_name: "cpu".to_string(),
semantic_type: SEMANTIC_FEILD,
semantic_type: SEMANTIC_FIELD,
values: Some(cpu_vals),
null_mask: vec![2],
..Default::default()
@@ -349,14 +350,14 @@ mod tests {
};
let mem_column = Column {
column_name: "memory".to_string(),
semantic_type: SEMANTIC_FEILD,
semantic_type: SEMANTIC_FIELD,
values: Some(mem_vals),
null_mask: vec![1],
..Default::default()
};
let ts_vals = column::Values {
i64_values: vec![100, 101],
ts_millis_values: vec![100, 101],
..Default::default()
};
let ts_column = Column {
@@ -364,7 +365,7 @@ mod tests {
semantic_type: SEMANTIC_TS,
values: Some(ts_vals),
null_mask: vec![0],
..Default::default()
datatype: 15,
};
let insert_batch = InsertBatch {

View File

@@ -62,6 +62,7 @@ mod tests {
use catalog::SchemaProvider;
use common_query::logical_plan::Expr;
use common_recordbatch::SendableRecordBatchStream;
use common_time::timestamp::Timestamp;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef};
use datatypes::value::Value;
@@ -92,7 +93,7 @@ mod tests {
ColumnSchema::new("host", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true),
ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true),
ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), true),
ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), true),
];
Arc::new(
@@ -208,8 +209,14 @@ mod tests {
let ts = &columns_values["ts"];
assert_eq!(2, ts.len());
assert_eq!(Value::from(1655276557000i64), ts.get(0));
assert_eq!(Value::from(1655276558000i64), ts.get(1));
assert_eq!(
Value::from(Timestamp::from_millis(1655276557000i64)),
ts.get(0)
);
assert_eq!(
Value::from(Timestamp::from_millis(1655276558000i64)),
ts.get(1)
);
}
_ => {
panic!("Not supposed to reach here")

View File

@@ -183,7 +183,7 @@ mod tests {
#[tokio::test]
pub async fn test_create_to_request() {
let handler = create_mock_sql_handler().await;
let parsed_stmt = sql_to_statement("create table demo_table( host string, ts bigint, cpu double default 0, memory double, TIME INDEX (ts), PRIMARY KEY(ts, host)) engine=mito with(regions=1);");
let parsed_stmt = sql_to_statement("create table demo_table( host string, ts timestamp, cpu double default 0, memory double, TIME INDEX (ts), PRIMARY KEY(ts, host)) engine=mito with(regions=1);");
let c = handler.create_to_request(42, parsed_stmt).unwrap();
assert_eq!("demo_table", c.table_name);
assert_eq!(42, c.id);
@@ -209,7 +209,7 @@ mod tests {
pub async fn test_primary_key_not_specified() {
let handler = create_mock_sql_handler().await;
let parsed_stmt = sql_to_statement("create table demo_table( host string, ts bigint, cpu double default 0, memory double, TIME INDEX (ts)) engine=mito with(regions=1);");
let parsed_stmt = sql_to_statement("create table demo_table( host string, ts timestamp, cpu double default 0, memory double, TIME INDEX (ts)) engine=mito with(regions=1);");
let c = handler.create_to_request(42, parsed_stmt).unwrap();
assert_eq!(1, c.primary_key_indices.len());
@@ -237,7 +237,7 @@ mod tests {
let create_table = sql_to_statement(
r"create table c.s.demo(
host string,
ts bigint,
ts timestamp,
cpu double default 0,
memory double,
TIME INDEX (ts),
@@ -267,7 +267,7 @@ mod tests {
.data_type
);
assert_eq!(
ConcreteDataType::int64_datatype(),
ConcreteDataType::timestamp_millis_datatype(),
request
.schema
.column_schema_by_name("ts")

View File

@@ -4,6 +4,7 @@ use datafusion::arrow_print;
use datafusion_common::record_batch::RecordBatch as DfRecordBatch;
use query::Output;
use crate::error;
use crate::instance::Instance;
use crate::tests::test_util;
@@ -67,7 +68,7 @@ pub async fn test_execute_create() {
.execute_sql(
r#"create table test_table(
host string,
ts bigint,
ts timestamp,
cpu double default 0,
memory double,
TIME INDEX (ts),
@@ -79,6 +80,29 @@ pub async fn test_execute_create() {
assert!(matches!(output, Output::AffectedRows(1)));
}
#[tokio::test]
pub async fn test_create_table_illegal_timestamp_type() {
common_telemetry::init_default_ut_logging();
let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts();
let instance = Instance::new(&opts).await.unwrap();
instance.start().await.unwrap();
let output = instance
.execute_sql(
r#"create table test_table(
host string,
ts bigint,
cpu double default 0,
memory double,
TIME INDEX (ts),
PRIMARY KEY(ts, host)
) engine=mito with(regions=1);"#,
)
.await;
assert!(matches!(output, Err(error::Error::CreateSchema { .. })));
}
#[tokio::test]
async fn test_alter_table() {
common_telemetry::init_default_ut_logging();

View File

@@ -87,10 +87,7 @@ impl ConcreteDataType {
}
pub fn is_timestamp(&self) -> bool {
matches!(
self,
ConcreteDataType::Int64(_) | ConcreteDataType::Timestamp(_)
)
matches!(self, ConcreteDataType::Timestamp(_))
}
pub fn numerics() -> Vec<ConcreteDataType> {
@@ -340,4 +337,17 @@ mod tests {
ConcreteDataType::from_arrow_time_unit(&arrow::datatypes::TimeUnit::Second)
);
}
#[test]
pub fn test_is_timestamp() {
assert!(ConcreteDataType::timestamp_millis_datatype().is_timestamp());
assert!(ConcreteDataType::timestamp_datatype(TimeUnit::Second).is_timestamp());
assert!(ConcreteDataType::timestamp_datatype(TimeUnit::Millisecond).is_timestamp());
assert!(ConcreteDataType::timestamp_datatype(TimeUnit::Microsecond).is_timestamp());
assert!(ConcreteDataType::timestamp_datatype(TimeUnit::Nanosecond).is_timestamp());
// since timestamp data type is implemented, int64 is no longer allowed
// to be used a data type for timestamp column
assert!(!ConcreteDataType::int64_datatype().is_timestamp());
}
}

View File

@@ -382,7 +382,7 @@ mod tests {
fn test_schema_with_timestamp() {
let column_schemas = vec![
ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false),
ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), false),
];
let schema = SchemaBuilder::from(column_schemas.clone())
.timestamp_index(1)

View File

@@ -5,7 +5,7 @@ use std::time::Duration;
use common_time::{RangeMillis, TimestampMillis};
use datatypes::prelude::ScalarVector;
use datatypes::schema::SchemaRef;
use datatypes::vectors::{Int64Vector, NullVector, VectorRef};
use datatypes::vectors::{NullVector, TimestampVector, VectorRef};
use snafu::{ensure, OptionExt};
use store_api::storage::{ColumnDescriptor, OpType, SequenceNumber};
@@ -210,7 +210,7 @@ struct SliceIndex {
/// Panics if the duration is too large to be represented by i64, or `timestamps` are not all
/// included by `time_range_indexes`.
fn compute_slice_indexes(
timestamps: &Int64Vector,
timestamps: &TimestampVector,
duration: Duration,
time_range_indexes: &RangeIndexMap,
) -> Vec<SliceIndex> {
@@ -228,7 +228,7 @@ fn compute_slice_indexes(
for (i, ts) in timestamps.iter_data().enumerate() {
// Find index for time range of the timestamp.
let current_range_index = ts
.and_then(|v| TimestampMillis::new(v).align_by_bucket(duration_ms))
.and_then(|v| TimestampMillis::new(v.value()).align_by_bucket(duration_ms))
.and_then(|aligned| time_range_indexes.get(&aligned).copied());
match current_range_index {
@@ -289,6 +289,9 @@ fn compute_slice_indexes(
#[cfg(test)]
mod tests {
use common_time::timestamp::Timestamp;
use datatypes::prelude::ScalarVectorBuilder;
use datatypes::vectors::{Int64Vector, TimestampVector, TimestampVectorBuilder};
use datatypes::{type_id::LogicalTypeId, value::Value};
use store_api::storage::{PutOperation, WriteRequest};
@@ -318,12 +321,18 @@ mod tests {
) {
assert!(duration > 0);
let timestamps = Int64Vector::from_iter(timestamps.iter());
let mut builder = TimestampVectorBuilder::with_capacity(0);
for v in timestamps {
builder.push(v.map(common_time::timestamp::Timestamp::from_millis));
}
let ts_vec = builder.finish();
let time_ranges = new_time_ranges(range_starts, duration);
let time_range_indexes = new_range_index_map(&time_ranges);
let slice_indexes = compute_slice_indexes(
&timestamps,
&ts_vec,
Duration::from_millis(duration as u64),
&time_range_indexes,
);
@@ -536,7 +545,7 @@ mod tests {
);
check_compute_slice_indexes(
&[Some(i64::MIN), Some(99), Some(15)],
&[Some(-1), Some(99), Some(15)],
&[0],
100,
&[SliceIndex {
@@ -550,7 +559,7 @@ mod tests {
fn new_test_write_batch() -> WriteBatch {
write_batch_util::new_write_batch(
&[
("ts", LogicalTypeId::Int64, false),
("ts", LogicalTypeId::Timestamp, false),
("value", LogicalTypeId::Int64, true),
],
Some(0),
@@ -559,7 +568,7 @@ mod tests {
fn new_region_schema() -> RegionSchemaRef {
let desc = RegionDescBuilder::new("test")
.timestamp(("ts", LogicalTypeId::Int64, false))
.timestamp(("ts", LogicalTypeId::Timestamp, false))
.push_value_column(("value", LogicalTypeId::Int64, true))
.enable_version_column(false)
.build();
@@ -570,7 +579,7 @@ mod tests {
fn put_batch(batch: &mut WriteBatch, data: &[(i64, Option<i64>)]) {
let mut put_data = PutData::with_num_columns(2);
let ts = Int64Vector::from_values(data.iter().map(|v| v.0));
let ts = TimestampVector::from_values(data.iter().map(|v| v.0));
put_data.add_key_column("ts", Arc::new(ts)).unwrap();
let value = Int64Vector::from_iter(data.iter().map(|v| v.1));
put_data.add_value_column("value", Arc::new(value)).unwrap();
@@ -602,7 +611,7 @@ mod tests {
for i in 0..row_num {
let ts = batch.column(0).get(i);
let v = batch.column(1).get(i);
assert_eq!(Value::from(data[index].0), ts);
assert_eq!(Value::Timestamp(Timestamp::from_millis(data[index].0)), ts);
assert_eq!(Value::from(data[index].1), v);
assert_eq!(Value::from(sequence), batch.column(2).get(i));

View File

@@ -554,7 +554,7 @@ mod tests {
fn test_descriptor_to_region_metadata() {
let region_name = "region-0";
let desc = RegionDescBuilder::new(region_name)
.timestamp(("ts", LogicalTypeId::Int64, false))
.timestamp(("ts", LogicalTypeId::Timestamp, false))
.enable_version_column(false)
.push_key_column(("k1", LogicalTypeId::Int32, false))
.push_value_column(("v1", LogicalTypeId::Float32, true))
@@ -563,7 +563,7 @@ mod tests {
let expect_schema = schema_util::new_schema_ref(
&[
("k1", LogicalTypeId::Int32, false),
("ts", LogicalTypeId::Int64, false),
("ts", LogicalTypeId::Timestamp, false),
("v1", LogicalTypeId::Float32, true),
],
Some(1),
@@ -663,10 +663,11 @@ mod tests {
}
fn new_metadata(enable_version_column: bool) -> RegionMetadata {
let timestamp = ColumnDescriptorBuilder::new(2, "ts", ConcreteDataType::int64_datatype())
.is_nullable(false)
.build()
.unwrap();
let timestamp =
ColumnDescriptorBuilder::new(2, "ts", ConcreteDataType::timestamp_millis_datatype())
.is_nullable(false)
.build()
.unwrap();
let row_key = RowKeyDescriptorBuilder::new(timestamp)
.push_column(
ColumnDescriptorBuilder::new(3, "k1", ConcreteDataType::int64_datatype())
@@ -703,7 +704,7 @@ mod tests {
let expect_schema = schema_util::new_schema_ref(
&[
("k1", LogicalTypeId::Int64, false),
("ts", LogicalTypeId::Int64, false),
("ts", LogicalTypeId::Timestamp, false),
("v1", LogicalTypeId::Int64, true),
],
Some(1),
@@ -750,7 +751,7 @@ mod tests {
let expect_schema = schema_util::new_schema_ref(
&[
("k1", LogicalTypeId::Int64, false),
("ts", LogicalTypeId::Int64, false),
("ts", LogicalTypeId::Timestamp, false),
(consts::VERSION_COLUMN_NAME, LogicalTypeId::UInt64, false),
("v1", LogicalTypeId::Int64, true),
],