diff --git a/src/common/time/src/timestamp_millis.rs b/src/common/time/src/timestamp_millis.rs index 4eddc58d41..ac31789303 100644 --- a/src/common/time/src/timestamp_millis.rs +++ b/src/common/time/src/timestamp_millis.rs @@ -21,24 +21,6 @@ impl TimestampMillis { TimestampMillis(ms) } - /// Returns the timestamp aligned by `bucket_duration` in milliseconds or - /// `None` if overflow occurred. - /// - /// # Panics - /// Panics if `bucket_duration <= 0`. - pub fn align_by_bucket(self, bucket_duration: i64) -> Option { - assert!(bucket_duration > 0); - - let ts = if self.0 >= 0 { - self.0 - } else { - // `bucket_duration > 0` implies `bucket_duration - 1` won't overflow. - self.0.checked_sub(bucket_duration - 1)? - }; - - Some(TimestampMillis(ts / bucket_duration * bucket_duration)) - } - /// Returns the timestamp value as i64. pub fn as_i64(&self) -> i64 { self.0 @@ -51,6 +33,12 @@ impl From for TimestampMillis { } } +impl From for i64 { + fn from(ts: TimestampMillis) -> Self { + ts.0 + } +} + impl PartialEq for TimestampMillis { fn eq(&self, other: &i64) -> bool { self.0 == *other @@ -75,6 +63,25 @@ impl PartialOrd for i64 { } } +pub trait BucketAligned { + /// Returns the timestamp aligned by `bucket_duration` in milliseconds or + /// `None` if overflow occurred. + /// + /// # Panics + /// Panics if `bucket_duration <= 0`. + fn align_by_bucket(self, bucket_duration: i64) -> Option; +} + +impl> BucketAligned for T { + fn align_by_bucket(self, bucket_duration: i64) -> Option { + assert!(bucket_duration > 0); + self.into() + .checked_div_euclid(bucket_duration) + .and_then(|val| val.checked_mul(bucket_duration)) + .map(TimestampMillis) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/datanode/src/tests/http_test.rs b/src/datanode/src/tests/http_test.rs index fba752185b..255edca9f3 100644 --- a/src/datanode/src/tests/http_test.rs +++ b/src/datanode/src/tests/http_test.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use axum::http::StatusCode; use axum::Router; use axum_test_helper::TestClient; +use datatypes::prelude::ConcreteDataType; use servers::http::handler::ScriptExecution; use servers::http::HttpServer; use servers::server::Server; @@ -16,7 +17,9 @@ async fn make_test_app() -> (Router, TestGuard) { let (opts, guard) = test_util::create_tmp_dir_and_datanode_opts(); let instance = Arc::new(Instance::new(&opts).await.unwrap()); instance.start().await.unwrap(); - test_util::create_test_table(&instance).await.unwrap(); + test_util::create_test_table(&instance, ConcreteDataType::timestamp_millis_datatype()) + .await + .unwrap(); let http_server = HttpServer::new(instance); (http_server.make_app(), guard) } diff --git a/src/datanode/src/tests/instance_test.rs b/src/datanode/src/tests/instance_test.rs index faa0316db5..8c23867fd5 100644 --- a/src/datanode/src/tests/instance_test.rs +++ b/src/datanode/src/tests/instance_test.rs @@ -1,11 +1,11 @@ -use arrow::array::UInt64Array; +use arrow::array::{Int64Array, UInt64Array}; use common_query::Output; use common_recordbatch::util; use datafusion::arrow_print; use datafusion_common::record_batch::RecordBatch as DfRecordBatch; use datatypes::arrow_array::StringArray; +use datatypes::prelude::ConcreteDataType; -use crate::error; use crate::instance::Instance; use crate::tests::test_util; @@ -17,7 +17,9 @@ async fn test_execute_insert() { let instance = Instance::new(&opts).await.unwrap(); instance.start().await.unwrap(); - test_util::create_test_table(&instance).await.unwrap(); + test_util::create_test_table(&instance, ConcreteDataType::timestamp_millis_datatype()) + .await + .unwrap(); let output = instance .execute_sql( @@ -31,6 +33,45 @@ async fn test_execute_insert() { assert!(matches!(output, Output::AffectedRows(2))); } +#[tokio::test] +async fn test_execute_insert_query_with_i64_timestamp() { + common_telemetry::init_default_ut_logging(); + + let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts(); + let instance = Instance::new(&opts).await.unwrap(); + instance.start().await.unwrap(); + + test_util::create_test_table(&instance, ConcreteDataType::int64_datatype()) + .await + .unwrap(); + + let output = instance + .execute_sql( + r#"insert into demo(host, cpu, memory, ts) values + ('host1', 66.6, 1024, 1655276557000), + ('host2', 88.8, 333.3, 1655276558000) + "#, + ) + .await + .unwrap(); + assert!(matches!(output, Output::AffectedRows(2))); + + let query_output = instance.execute_sql("select ts from demo").await.unwrap(); + + match query_output { + Output::Stream(s) => { + let batches = util::collect(s).await.unwrap(); + let columns = batches[0].df_recordbatch.columns(); + assert_eq!(1, columns.len()); + assert_eq!( + &Int64Array::from_slice(&[1655276557000, 1655276558000]), + columns[0].as_any().downcast_ref::().unwrap() + ); + } + _ => unreachable!(), + } +} + #[tokio::test] async fn test_execute_query() { let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts(); @@ -110,7 +151,9 @@ async fn test_execute_show_databases_tables() { } // creat a table - test_util::create_test_table(&instance).await.unwrap(); + test_util::create_test_table(&instance, ConcreteDataType::timestamp_millis_datatype()) + .await + .unwrap(); let output = instance.execute_sql("show tables").await.unwrap(); match output { @@ -187,8 +230,14 @@ pub async fn test_create_table_illegal_timestamp_type() { PRIMARY KEY(host) ) engine=mito with(regions=1);"#, ) - .await; - assert!(matches!(output, Err(error::Error::CreateSchema { .. }))); + .await + .unwrap(); + match output { + Output::AffectedRows(rows) => { + assert_eq!(1, rows); + } + _ => unreachable!(), + } } #[tokio::test] @@ -198,7 +247,9 @@ async fn test_alter_table() { let instance = Instance::new_mock().await.unwrap(); instance.start().await.unwrap(); - test_util::create_test_table(&instance).await.unwrap(); + test_util::create_test_table(&instance, ConcreteDataType::timestamp_millis_datatype()) + .await + .unwrap(); // make sure table insertion is ok before altering table instance .execute_sql("insert into demo(host, cpu, memory, ts) values ('host1', 1.1, 100, 1000)") diff --git a/src/datanode/src/tests/test_util.rs b/src/datanode/src/tests/test_util.rs index 19996908c6..2866443769 100644 --- a/src/datanode/src/tests/test_util.rs +++ b/src/datanode/src/tests/test_util.rs @@ -43,12 +43,12 @@ pub fn create_tmp_dir_and_datanode_opts() -> (DatanodeOptions, TestGuard) { ) } -pub async fn create_test_table(instance: &Instance) -> Result<()> { +pub async fn create_test_table(instance: &Instance, ts_type: ConcreteDataType) -> Result<()> { let column_schemas = vec![ ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true), ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true), - ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), true), + ColumnSchema::new("ts", ts_type, true), ]; let table_name = "demo"; diff --git a/src/datatypes/src/data_type.rs b/src/datatypes/src/data_type.rs index fc41c8d088..da9c4c8b5b 100644 --- a/src/datatypes/src/data_type.rs +++ b/src/datatypes/src/data_type.rs @@ -90,7 +90,10 @@ impl ConcreteDataType { } pub fn is_timestamp(&self) -> bool { - matches!(self, ConcreteDataType::Timestamp(_)) + matches!( + self, + ConcreteDataType::Timestamp(_) | ConcreteDataType::Int64(_) + ) } pub fn numerics() -> Vec { @@ -352,10 +355,7 @@ mod tests { assert!(ConcreteDataType::timestamp_datatype(TimeUnit::Millisecond).is_timestamp()); assert!(ConcreteDataType::timestamp_datatype(TimeUnit::Microsecond).is_timestamp()); assert!(ConcreteDataType::timestamp_datatype(TimeUnit::Nanosecond).is_timestamp()); - - // since timestamp data type is implemented, int64 is no longer allowed - // to be used a data type for timestamp column - assert!(!ConcreteDataType::int64_datatype().is_timestamp()); + assert!(ConcreteDataType::int64_datatype().is_timestamp()); } #[test] diff --git a/src/storage/src/error.rs b/src/storage/src/error.rs index 50947601c0..851f0d11b3 100644 --- a/src/storage/src/error.rs +++ b/src/storage/src/error.rs @@ -5,6 +5,7 @@ use std::str::Utf8Error; use common_error::prelude::*; use datatypes::arrow; use datatypes::arrow::error::ArrowError; +use datatypes::prelude::ConcreteDataType; use serde_json::error::Error as JsonError; use store_api::manifest::action::ProtocolVersion; use store_api::manifest::ManifestVersion; @@ -296,6 +297,9 @@ pub enum Error { backtrace: Backtrace, }, + #[snafu(display("Timestamp column type illegal, data type: {:?}", data_type))] + IllegalTimestampColumnType { data_type: ConcreteDataType }, + #[snafu(display( "Failed to convert between ColumnSchema and ColumnMetadata, source: {}", source @@ -320,7 +324,8 @@ impl ErrorExt for Error { | InvalidProjection { .. } | BuildBatch { .. } | NotInSchemaToCompat { .. } - | WriteToOldVersion { .. } => StatusCode::InvalidArguments, + | WriteToOldVersion { .. } + | IllegalTimestampColumnType { .. } => StatusCode::InvalidArguments, Utf8 { .. } | EncodeJson { .. } diff --git a/src/storage/src/memtable/inserter.rs b/src/storage/src/memtable/inserter.rs index 8cdf8621c9..0baf4f2a52 100644 --- a/src/storage/src/memtable/inserter.rs +++ b/src/storage/src/memtable/inserter.rs @@ -1,14 +1,16 @@ use std::collections::HashMap; use std::time::Duration; +use common_time::timestamp_millis::BucketAligned; use common_time::{RangeMillis, TimestampMillis}; +use datatypes::data_type::ConcreteDataType; use datatypes::prelude::ScalarVector; use datatypes::schema::SchemaRef; -use datatypes::vectors::{TimestampVector, VectorRef}; +use datatypes::vectors::{Int64Vector, TimestampVector, VectorRef}; use snafu::OptionExt; use store_api::storage::{ColumnDescriptor, OpType, SequenceNumber}; -use crate::error::{self, Result}; +use crate::error::{self, IllegalTimestampColumnTypeSnafu, Result}; use crate::memtable::{KeyValues, Memtable, MemtableSet}; use crate::write_batch::{Mutation, PutData, WriteBatch}; @@ -144,12 +146,31 @@ impl Inserter { column: ×tamp_schema.name, }, )?; - let timestamps = timestamps - .as_any() - .downcast_ref() - .context(error::BatchMissingTimestampSnafu)?; - let slice_indexes = - compute_slice_indexes(timestamps, self.bucket_duration, &self.time_range_indexes); + + let slice_indexes = match timestamps.data_type() { + ConcreteDataType::Int64(_) => { + let timestamps: &Int64Vector = timestamps + .as_any() + .downcast_ref() + .context(error::BatchMissingTimestampSnafu)?; + let iter = timestamps.iter_data(); + compute_slice_indices(iter, self.bucket_duration, &self.time_range_indexes) + } + ConcreteDataType::Timestamp(_) => { + let timestamps: &TimestampVector = timestamps + .as_any() + .downcast_ref() + .context(error::BatchMissingTimestampSnafu)?; + let iter = timestamps.iter_data().map(|v| v.map(|v| v.value())); + compute_slice_indices(iter, self.bucket_duration, &self.time_range_indexes) + } + _ => { + return IllegalTimestampColumnTypeSnafu { + data_type: timestamps.data_type(), + } + .fail(); + } + }; for slice_index in slice_indexes { let sliced_data = put_data.slice(slice_index.start, slice_index.end); @@ -216,8 +237,8 @@ struct SliceIndex { /// # Panics /// Panics if the duration is too large to be represented by i64, or `timestamps` are not all /// included by `time_range_indexes`. -fn compute_slice_indexes( - timestamps: &TimestampVector, +fn compute_slice_indices>>( + timestamps: I, duration: Duration, time_range_indexes: &RangeIndexMap, ) -> Vec { @@ -225,6 +246,7 @@ fn compute_slice_indexes( .as_millis() .try_into() .unwrap_or_else(|e| panic!("Duration {:?} too large, {}", duration, e)); + let mut slice_indexes = Vec::with_capacity(time_range_indexes.len()); // Current start and end of a valid `SliceIndex`. let (mut start, mut end) = (0, 0); @@ -232,10 +254,11 @@ fn compute_slice_indexes( let mut last_range_index = None; // Iterate all timestamps, split timestamps by its time range. - for (i, ts) in timestamps.iter_data().enumerate() { + for (i, ts) in timestamps.enumerate() { // Find index for time range of the timestamp. + let current_range_index = ts - .and_then(|v| TimestampMillis::new(v.value()).align_by_bucket(duration_ms)) + .and_then(|v| v.align_by_bucket(duration_ms)) .and_then(|aligned| time_range_indexes.get(&aligned).copied()); match current_range_index { @@ -300,7 +323,9 @@ mod tests { use common_time::timestamp::Timestamp; use datatypes::prelude::ScalarVectorBuilder; - use datatypes::vectors::{Int64Vector, TimestampVector, TimestampVectorBuilder}; + use datatypes::vectors::{ + Int64Vector, Int64VectorBuilder, TimestampVector, TimestampVectorBuilder, + }; use datatypes::{type_id::LogicalTypeId, value::Value}; use store_api::storage::{PutOperation, WriteRequest}; @@ -340,8 +365,8 @@ mod tests { let time_ranges = new_time_ranges(range_starts, duration); let time_range_indexes = new_range_index_map(&time_ranges); - let slice_indexes = compute_slice_indexes( - &ts_vec, + let slice_indexes = compute_slice_indices( + ts_vec.iter_data().map(|v| v.map(|v| v.value())), Duration::from_millis(duration as u64), &time_range_indexes, ); @@ -349,6 +374,82 @@ mod tests { assert_eq!(expect, slice_indexes); } + #[test] + fn test_check_compute_slice_indexes_i64() { + let timestamps = &[Some(99), Some(13), Some(18), Some(234)]; + let range_starts = &[0, 200]; + let duration = 100; + + let mut builder = Int64VectorBuilder::with_capacity(timestamps.len()); + for v in timestamps { + builder.push(*v); + } + + let ts_vec = builder.finish(); + + let time_ranges = new_time_ranges(range_starts, duration); + let time_range_indexes = new_range_index_map(&time_ranges); + + let slice_indexes = compute_slice_indices( + ts_vec.iter_data(), + Duration::from_millis(duration as u64), + &time_range_indexes, + ); + assert_eq!( + vec![ + SliceIndex { + start: 0, + end: 3, + range_index: 0, + }, + SliceIndex { + start: 3, + end: 4, + range_index: 1, + }, + ], + slice_indexes + ); + } + + #[test] + fn test_check_compute_slice_indexes_timestamp() { + let timestamps = &[Some(99), Some(13), Some(18), Some(234)]; + let range_starts = &[0, 200]; + let duration = 100; + + let mut builder = TimestampVectorBuilder::with_capacity(timestamps.len()); + for v in timestamps { + builder.push(v.map(Timestamp::from_millis)); + } + + let ts_vec = builder.finish(); + + let time_ranges = new_time_ranges(range_starts, duration); + let time_range_indexes = new_range_index_map(&time_ranges); + + let slice_indexes = compute_slice_indices( + ts_vec.iter_data().map(|v| v.map(|v| v.value())), + Duration::from_millis(duration as u64), + &time_range_indexes, + ); + assert_eq!( + vec![ + SliceIndex { + start: 0, + end: 3, + range_index: 0, + }, + SliceIndex { + start: 3, + end: 4, + range_index: 1, + }, + ], + slice_indexes + ); + } + #[test] fn test_compute_slice_indexes_valid() { // Test empty input. diff --git a/src/storage/src/write_batch.rs b/src/storage/src/write_batch.rs index 020d2b3107..2e15dbabe5 100644 --- a/src/storage/src/write_batch.rs +++ b/src/storage/src/write_batch.rs @@ -8,9 +8,10 @@ use std::{ }; use common_error::prelude::*; -use common_time::{RangeMillis, TimestampMillis}; +use common_time::timestamp_millis::BucketAligned; +use common_time::RangeMillis; use datatypes::schema::{ColumnSchema, SchemaRef}; -use datatypes::vectors::TimestampVector; +use datatypes::vectors::{Int64Vector, TimestampVector}; use datatypes::{ arrow::error::ArrowError, data_type::ConcreteDataType, prelude::ScalarVector, prelude::Value, vectors::VectorRef, @@ -216,11 +217,26 @@ impl WriteRequest for WriteBatch { aligned_timestamps.insert(aligned); } else { - let ts_vector = column.as_any().downcast_ref::().unwrap(); // not expected to fail - for ts in ts_vector.iter_data().flatten() { - let aligned = align_timestamp(ts.value(), durations_millis) - .context(TimestampOverflowSnafu { ts: ts.value() })?; - aligned_timestamps.insert(aligned); + match column.data_type() { + ConcreteDataType::Timestamp(_) => { + let ts_vector = + column.as_any().downcast_ref::().unwrap(); + for ts in ts_vector.iter_data().flatten() { + let aligned = align_timestamp(ts.value(), durations_millis) + .context(TimestampOverflowSnafu { ts: ts.value() })?; + aligned_timestamps.insert(aligned); + } + } + ConcreteDataType::Int64(_) => { + let ts_vector = + column.as_any().downcast_ref::().unwrap(); + for ts in ts_vector.iter_data().flatten() { + let aligned = align_timestamp(ts, durations_millis) + .context(TimestampOverflowSnafu { ts })?; + aligned_timestamps.insert(aligned); + } + } + _ => unreachable!(), } } } @@ -250,7 +266,7 @@ impl WriteRequest for WriteBatch { /// So timestamp within `[i64::MIN, i64::MIN + duration)` or /// `[i64::MAX-(i64::MAX%duration), i64::MAX]` is not a valid input. fn align_timestamp(ts: i64, duration: i64) -> Option { - let aligned = TimestampMillis::new(ts).align_by_bucket(duration)?.as_i64(); + let aligned = ts.align_by_bucket(duration)?.as_i64(); // Also ensure end timestamp won't overflow. aligned.checked_add(duration)?; Some(aligned) @@ -955,7 +971,7 @@ mod tests { #[test] pub fn test_align_timestamp_overflow() { assert_eq!(Some(i64::MIN), align_timestamp(i64::MIN, 1)); - assert_eq!(None, align_timestamp(i64::MIN, 2)); + assert_eq!(Some(-9223372036854775808), align_timestamp(i64::MIN, 2)); assert_eq!( Some(((i64::MIN + 20) / 20 - 1) * 20), align_timestamp(i64::MIN + 20, 20)