feat: timestamp column support i64 (#325)

* feat: align_bucket support i64 and timestamp values

* feat: add Int64 to timestamp

* feat: support query i64 timestamp vector

* test: fix failling tests

* refactor: simplify some code

* fix: CR comments and add insert and query test for i64 timestamp column
This commit is contained in:
Lei, Huang
2022-10-28 18:39:11 +08:00
committed by GitHub
parent 3e8d9b421c
commit 81716d622e
8 changed files with 241 additions and 58 deletions

View File

@@ -21,24 +21,6 @@ impl TimestampMillis {
TimestampMillis(ms)
}
/// Returns the timestamp aligned by `bucket_duration` in milliseconds or
/// `None` if overflow occurred.
///
/// # Panics
/// Panics if `bucket_duration <= 0`.
pub fn align_by_bucket(self, bucket_duration: i64) -> Option<TimestampMillis> {
assert!(bucket_duration > 0);
let ts = if self.0 >= 0 {
self.0
} else {
// `bucket_duration > 0` implies `bucket_duration - 1` won't overflow.
self.0.checked_sub(bucket_duration - 1)?
};
Some(TimestampMillis(ts / bucket_duration * bucket_duration))
}
/// Returns the timestamp value as i64.
pub fn as_i64(&self) -> i64 {
self.0
@@ -51,6 +33,12 @@ impl From<i64> for TimestampMillis {
}
}
impl From<TimestampMillis> for i64 {
fn from(ts: TimestampMillis) -> Self {
ts.0
}
}
impl PartialEq<i64> for TimestampMillis {
fn eq(&self, other: &i64) -> bool {
self.0 == *other
@@ -75,6 +63,25 @@ impl PartialOrd<TimestampMillis> for i64 {
}
}
pub trait BucketAligned {
/// Returns the timestamp aligned by `bucket_duration` in milliseconds or
/// `None` if overflow occurred.
///
/// # Panics
/// Panics if `bucket_duration <= 0`.
fn align_by_bucket(self, bucket_duration: i64) -> Option<TimestampMillis>;
}
impl<T: Into<i64>> BucketAligned for T {
fn align_by_bucket(self, bucket_duration: i64) -> Option<TimestampMillis> {
assert!(bucket_duration > 0);
self.into()
.checked_div_euclid(bucket_duration)
.and_then(|val| val.checked_mul(bucket_duration))
.map(TimestampMillis)
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -4,6 +4,7 @@ use std::sync::Arc;
use axum::http::StatusCode;
use axum::Router;
use axum_test_helper::TestClient;
use datatypes::prelude::ConcreteDataType;
use servers::http::handler::ScriptExecution;
use servers::http::HttpServer;
use servers::server::Server;
@@ -16,7 +17,9 @@ async fn make_test_app() -> (Router, TestGuard) {
let (opts, guard) = test_util::create_tmp_dir_and_datanode_opts();
let instance = Arc::new(Instance::new(&opts).await.unwrap());
instance.start().await.unwrap();
test_util::create_test_table(&instance).await.unwrap();
test_util::create_test_table(&instance, ConcreteDataType::timestamp_millis_datatype())
.await
.unwrap();
let http_server = HttpServer::new(instance);
(http_server.make_app(), guard)
}

View File

@@ -1,11 +1,11 @@
use arrow::array::UInt64Array;
use arrow::array::{Int64Array, UInt64Array};
use common_query::Output;
use common_recordbatch::util;
use datafusion::arrow_print;
use datafusion_common::record_batch::RecordBatch as DfRecordBatch;
use datatypes::arrow_array::StringArray;
use datatypes::prelude::ConcreteDataType;
use crate::error;
use crate::instance::Instance;
use crate::tests::test_util;
@@ -17,7 +17,9 @@ async fn test_execute_insert() {
let instance = Instance::new(&opts).await.unwrap();
instance.start().await.unwrap();
test_util::create_test_table(&instance).await.unwrap();
test_util::create_test_table(&instance, ConcreteDataType::timestamp_millis_datatype())
.await
.unwrap();
let output = instance
.execute_sql(
@@ -31,6 +33,45 @@ async fn test_execute_insert() {
assert!(matches!(output, Output::AffectedRows(2)));
}
#[tokio::test]
async fn test_execute_insert_query_with_i64_timestamp() {
common_telemetry::init_default_ut_logging();
let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts();
let instance = Instance::new(&opts).await.unwrap();
instance.start().await.unwrap();
test_util::create_test_table(&instance, ConcreteDataType::int64_datatype())
.await
.unwrap();
let output = instance
.execute_sql(
r#"insert into demo(host, cpu, memory, ts) values
('host1', 66.6, 1024, 1655276557000),
('host2', 88.8, 333.3, 1655276558000)
"#,
)
.await
.unwrap();
assert!(matches!(output, Output::AffectedRows(2)));
let query_output = instance.execute_sql("select ts from demo").await.unwrap();
match query_output {
Output::Stream(s) => {
let batches = util::collect(s).await.unwrap();
let columns = batches[0].df_recordbatch.columns();
assert_eq!(1, columns.len());
assert_eq!(
&Int64Array::from_slice(&[1655276557000, 1655276558000]),
columns[0].as_any().downcast_ref::<Int64Array>().unwrap()
);
}
_ => unreachable!(),
}
}
#[tokio::test]
async fn test_execute_query() {
let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts();
@@ -110,7 +151,9 @@ async fn test_execute_show_databases_tables() {
}
// creat a table
test_util::create_test_table(&instance).await.unwrap();
test_util::create_test_table(&instance, ConcreteDataType::timestamp_millis_datatype())
.await
.unwrap();
let output = instance.execute_sql("show tables").await.unwrap();
match output {
@@ -187,8 +230,14 @@ pub async fn test_create_table_illegal_timestamp_type() {
PRIMARY KEY(host)
) engine=mito with(regions=1);"#,
)
.await;
assert!(matches!(output, Err(error::Error::CreateSchema { .. })));
.await
.unwrap();
match output {
Output::AffectedRows(rows) => {
assert_eq!(1, rows);
}
_ => unreachable!(),
}
}
#[tokio::test]
@@ -198,7 +247,9 @@ async fn test_alter_table() {
let instance = Instance::new_mock().await.unwrap();
instance.start().await.unwrap();
test_util::create_test_table(&instance).await.unwrap();
test_util::create_test_table(&instance, ConcreteDataType::timestamp_millis_datatype())
.await
.unwrap();
// make sure table insertion is ok before altering table
instance
.execute_sql("insert into demo(host, cpu, memory, ts) values ('host1', 1.1, 100, 1000)")

View File

@@ -43,12 +43,12 @@ pub fn create_tmp_dir_and_datanode_opts() -> (DatanodeOptions, TestGuard) {
)
}
pub async fn create_test_table(instance: &Instance) -> Result<()> {
pub async fn create_test_table(instance: &Instance, ts_type: ConcreteDataType) -> Result<()> {
let column_schemas = vec![
ColumnSchema::new("host", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true),
ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true),
ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), true),
ColumnSchema::new("ts", ts_type, true),
];
let table_name = "demo";

View File

@@ -90,7 +90,10 @@ impl ConcreteDataType {
}
pub fn is_timestamp(&self) -> bool {
matches!(self, ConcreteDataType::Timestamp(_))
matches!(
self,
ConcreteDataType::Timestamp(_) | ConcreteDataType::Int64(_)
)
}
pub fn numerics() -> Vec<ConcreteDataType> {
@@ -352,10 +355,7 @@ mod tests {
assert!(ConcreteDataType::timestamp_datatype(TimeUnit::Millisecond).is_timestamp());
assert!(ConcreteDataType::timestamp_datatype(TimeUnit::Microsecond).is_timestamp());
assert!(ConcreteDataType::timestamp_datatype(TimeUnit::Nanosecond).is_timestamp());
// since timestamp data type is implemented, int64 is no longer allowed
// to be used a data type for timestamp column
assert!(!ConcreteDataType::int64_datatype().is_timestamp());
assert!(ConcreteDataType::int64_datatype().is_timestamp());
}
#[test]

View File

@@ -5,6 +5,7 @@ use std::str::Utf8Error;
use common_error::prelude::*;
use datatypes::arrow;
use datatypes::arrow::error::ArrowError;
use datatypes::prelude::ConcreteDataType;
use serde_json::error::Error as JsonError;
use store_api::manifest::action::ProtocolVersion;
use store_api::manifest::ManifestVersion;
@@ -296,6 +297,9 @@ pub enum Error {
backtrace: Backtrace,
},
#[snafu(display("Timestamp column type illegal, data type: {:?}", data_type))]
IllegalTimestampColumnType { data_type: ConcreteDataType },
#[snafu(display(
"Failed to convert between ColumnSchema and ColumnMetadata, source: {}",
source
@@ -320,7 +324,8 @@ impl ErrorExt for Error {
| InvalidProjection { .. }
| BuildBatch { .. }
| NotInSchemaToCompat { .. }
| WriteToOldVersion { .. } => StatusCode::InvalidArguments,
| WriteToOldVersion { .. }
| IllegalTimestampColumnType { .. } => StatusCode::InvalidArguments,
Utf8 { .. }
| EncodeJson { .. }

View File

@@ -1,14 +1,16 @@
use std::collections::HashMap;
use std::time::Duration;
use common_time::timestamp_millis::BucketAligned;
use common_time::{RangeMillis, TimestampMillis};
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::ScalarVector;
use datatypes::schema::SchemaRef;
use datatypes::vectors::{TimestampVector, VectorRef};
use datatypes::vectors::{Int64Vector, TimestampVector, VectorRef};
use snafu::OptionExt;
use store_api::storage::{ColumnDescriptor, OpType, SequenceNumber};
use crate::error::{self, Result};
use crate::error::{self, IllegalTimestampColumnTypeSnafu, Result};
use crate::memtable::{KeyValues, Memtable, MemtableSet};
use crate::write_batch::{Mutation, PutData, WriteBatch};
@@ -144,12 +146,31 @@ impl Inserter {
column: &timestamp_schema.name,
},
)?;
let timestamps = timestamps
.as_any()
.downcast_ref()
.context(error::BatchMissingTimestampSnafu)?;
let slice_indexes =
compute_slice_indexes(timestamps, self.bucket_duration, &self.time_range_indexes);
let slice_indexes = match timestamps.data_type() {
ConcreteDataType::Int64(_) => {
let timestamps: &Int64Vector = timestamps
.as_any()
.downcast_ref()
.context(error::BatchMissingTimestampSnafu)?;
let iter = timestamps.iter_data();
compute_slice_indices(iter, self.bucket_duration, &self.time_range_indexes)
}
ConcreteDataType::Timestamp(_) => {
let timestamps: &TimestampVector = timestamps
.as_any()
.downcast_ref()
.context(error::BatchMissingTimestampSnafu)?;
let iter = timestamps.iter_data().map(|v| v.map(|v| v.value()));
compute_slice_indices(iter, self.bucket_duration, &self.time_range_indexes)
}
_ => {
return IllegalTimestampColumnTypeSnafu {
data_type: timestamps.data_type(),
}
.fail();
}
};
for slice_index in slice_indexes {
let sliced_data = put_data.slice(slice_index.start, slice_index.end);
@@ -216,8 +237,8 @@ struct SliceIndex {
/// # Panics
/// Panics if the duration is too large to be represented by i64, or `timestamps` are not all
/// included by `time_range_indexes`.
fn compute_slice_indexes(
timestamps: &TimestampVector,
fn compute_slice_indices<I: Iterator<Item = Option<i64>>>(
timestamps: I,
duration: Duration,
time_range_indexes: &RangeIndexMap,
) -> Vec<SliceIndex> {
@@ -225,6 +246,7 @@ fn compute_slice_indexes(
.as_millis()
.try_into()
.unwrap_or_else(|e| panic!("Duration {:?} too large, {}", duration, e));
let mut slice_indexes = Vec::with_capacity(time_range_indexes.len());
// Current start and end of a valid `SliceIndex`.
let (mut start, mut end) = (0, 0);
@@ -232,10 +254,11 @@ fn compute_slice_indexes(
let mut last_range_index = None;
// Iterate all timestamps, split timestamps by its time range.
for (i, ts) in timestamps.iter_data().enumerate() {
for (i, ts) in timestamps.enumerate() {
// Find index for time range of the timestamp.
let current_range_index = ts
.and_then(|v| TimestampMillis::new(v.value()).align_by_bucket(duration_ms))
.and_then(|v| v.align_by_bucket(duration_ms))
.and_then(|aligned| time_range_indexes.get(&aligned).copied());
match current_range_index {
@@ -300,7 +323,9 @@ mod tests {
use common_time::timestamp::Timestamp;
use datatypes::prelude::ScalarVectorBuilder;
use datatypes::vectors::{Int64Vector, TimestampVector, TimestampVectorBuilder};
use datatypes::vectors::{
Int64Vector, Int64VectorBuilder, TimestampVector, TimestampVectorBuilder,
};
use datatypes::{type_id::LogicalTypeId, value::Value};
use store_api::storage::{PutOperation, WriteRequest};
@@ -340,8 +365,8 @@ mod tests {
let time_ranges = new_time_ranges(range_starts, duration);
let time_range_indexes = new_range_index_map(&time_ranges);
let slice_indexes = compute_slice_indexes(
&ts_vec,
let slice_indexes = compute_slice_indices(
ts_vec.iter_data().map(|v| v.map(|v| v.value())),
Duration::from_millis(duration as u64),
&time_range_indexes,
);
@@ -349,6 +374,82 @@ mod tests {
assert_eq!(expect, slice_indexes);
}
#[test]
fn test_check_compute_slice_indexes_i64() {
let timestamps = &[Some(99), Some(13), Some(18), Some(234)];
let range_starts = &[0, 200];
let duration = 100;
let mut builder = Int64VectorBuilder::with_capacity(timestamps.len());
for v in timestamps {
builder.push(*v);
}
let ts_vec = builder.finish();
let time_ranges = new_time_ranges(range_starts, duration);
let time_range_indexes = new_range_index_map(&time_ranges);
let slice_indexes = compute_slice_indices(
ts_vec.iter_data(),
Duration::from_millis(duration as u64),
&time_range_indexes,
);
assert_eq!(
vec![
SliceIndex {
start: 0,
end: 3,
range_index: 0,
},
SliceIndex {
start: 3,
end: 4,
range_index: 1,
},
],
slice_indexes
);
}
#[test]
fn test_check_compute_slice_indexes_timestamp() {
let timestamps = &[Some(99), Some(13), Some(18), Some(234)];
let range_starts = &[0, 200];
let duration = 100;
let mut builder = TimestampVectorBuilder::with_capacity(timestamps.len());
for v in timestamps {
builder.push(v.map(Timestamp::from_millis));
}
let ts_vec = builder.finish();
let time_ranges = new_time_ranges(range_starts, duration);
let time_range_indexes = new_range_index_map(&time_ranges);
let slice_indexes = compute_slice_indices(
ts_vec.iter_data().map(|v| v.map(|v| v.value())),
Duration::from_millis(duration as u64),
&time_range_indexes,
);
assert_eq!(
vec![
SliceIndex {
start: 0,
end: 3,
range_index: 0,
},
SliceIndex {
start: 3,
end: 4,
range_index: 1,
},
],
slice_indexes
);
}
#[test]
fn test_compute_slice_indexes_valid() {
// Test empty input.

View File

@@ -8,9 +8,10 @@ use std::{
};
use common_error::prelude::*;
use common_time::{RangeMillis, TimestampMillis};
use common_time::timestamp_millis::BucketAligned;
use common_time::RangeMillis;
use datatypes::schema::{ColumnSchema, SchemaRef};
use datatypes::vectors::TimestampVector;
use datatypes::vectors::{Int64Vector, TimestampVector};
use datatypes::{
arrow::error::ArrowError, data_type::ConcreteDataType, prelude::ScalarVector, prelude::Value,
vectors::VectorRef,
@@ -216,11 +217,26 @@ impl WriteRequest for WriteBatch {
aligned_timestamps.insert(aligned);
} else {
let ts_vector = column.as_any().downcast_ref::<TimestampVector>().unwrap(); // not expected to fail
for ts in ts_vector.iter_data().flatten() {
let aligned = align_timestamp(ts.value(), durations_millis)
.context(TimestampOverflowSnafu { ts: ts.value() })?;
aligned_timestamps.insert(aligned);
match column.data_type() {
ConcreteDataType::Timestamp(_) => {
let ts_vector =
column.as_any().downcast_ref::<TimestampVector>().unwrap();
for ts in ts_vector.iter_data().flatten() {
let aligned = align_timestamp(ts.value(), durations_millis)
.context(TimestampOverflowSnafu { ts: ts.value() })?;
aligned_timestamps.insert(aligned);
}
}
ConcreteDataType::Int64(_) => {
let ts_vector =
column.as_any().downcast_ref::<Int64Vector>().unwrap();
for ts in ts_vector.iter_data().flatten() {
let aligned = align_timestamp(ts, durations_millis)
.context(TimestampOverflowSnafu { ts })?;
aligned_timestamps.insert(aligned);
}
}
_ => unreachable!(),
}
}
}
@@ -250,7 +266,7 @@ impl WriteRequest for WriteBatch {
/// So timestamp within `[i64::MIN, i64::MIN + duration)` or
/// `[i64::MAX-(i64::MAX%duration), i64::MAX]` is not a valid input.
fn align_timestamp(ts: i64, duration: i64) -> Option<i64> {
let aligned = TimestampMillis::new(ts).align_by_bucket(duration)?.as_i64();
let aligned = ts.align_by_bucket(duration)?.as_i64();
// Also ensure end timestamp won't overflow.
aligned.checked_add(duration)?;
Some(aligned)
@@ -955,7 +971,7 @@ mod tests {
#[test]
pub fn test_align_timestamp_overflow() {
assert_eq!(Some(i64::MIN), align_timestamp(i64::MIN, 1));
assert_eq!(None, align_timestamp(i64::MIN, 2));
assert_eq!(Some(-9223372036854775808), align_timestamp(i64::MIN, 2));
assert_eq!(
Some(((i64::MIN + 20) / 20 - 1) * 20),
align_timestamp(i64::MIN + 20, 20)