fix: CURRENT_TIMESTAMP supports int64 type (#436)

* fix: Fix int64 type not considered in DEFAULT CURRENT_TIMESTAMP() constraint

Also avoid using `ConstantVector` in default constraint, as other user
may try to downcast it to a concrete type, and sometimes may forget to
check whether it is a constant vector.

* test: Add test for writing default value
This commit is contained in:
Yingwen
2022-11-10 11:35:16 +08:00
committed by GitHub
parent c3776ddd18
commit cefdffff09
3 changed files with 132 additions and 38 deletions

View File

@@ -92,6 +92,7 @@ pub enum Error {
#[snafu(display("Failed to insert value to table: {}, source: {}", table_name, source))]
Insert {
table_name: String,
#[snafu(backtrace)]
source: TableError,
},

View File

@@ -1,6 +1,8 @@
use arrow::array::{Int64Array, UInt64Array};
use common_query::Output;
use common_recordbatch::util;
use datafusion::arrow_print;
use datafusion_common::record_batch::RecordBatch as DfRecordBatch;
use datatypes::arrow_array::StringArray;
use datatypes::prelude::ConcreteDataType;
@@ -240,12 +242,24 @@ pub async fn test_create_table_illegal_timestamp_type() {
}
}
async fn check_output_stream(output: Output, expected: Vec<&str>) {
match output {
Output::Stream(stream) => {
let recordbatches = util::collect(stream).await.unwrap();
let recordbatch = recordbatches
.into_iter()
.map(|r| r.df_recordbatch)
.collect::<Vec<DfRecordBatch>>();
let pretty_print = arrow_print::write(&recordbatch);
let pretty_print = pretty_print.lines().collect::<Vec<&str>>();
assert_eq!(pretty_print, expected);
}
_ => unreachable!(),
}
}
#[tokio::test]
async fn test_alter_table() {
use datafusion::arrow_print;
use datafusion_common::record_batch::RecordBatch as DfRecordBatch;
// TODO(LFC) Use real Mito engine when we can alter its region schema,
// and delete the `new_mock` method.
let instance = Instance::new_mock().await.unwrap();
instance.start().await.unwrap();
@@ -278,26 +292,69 @@ async fn test_alter_table() {
assert!(matches!(output, Output::AffectedRows(1)));
let output = instance.execute_sql("select * from demo").await.unwrap();
match output {
Output::Stream(stream) => {
let recordbatches = util::collect(stream).await.unwrap();
let recordbatch = recordbatches
.into_iter()
.map(|r| r.df_recordbatch)
.collect::<Vec<DfRecordBatch>>();
let pretty_print = arrow_print::write(&recordbatch);
let pretty_print = pretty_print.lines().collect::<Vec<&str>>();
let expected = vec![
"+-------+-----+--------+---------------------+--------+",
"| host | cpu | memory | ts | my_tag |",
"+-------+-----+--------+---------------------+--------+",
"| host1 | 1.1 | 100 | 1970-01-01 00:00:01 | |",
"| host2 | 2.2 | 200 | 1970-01-01 00:00:02 | hello |",
"| host3 | 3.3 | 300 | 1970-01-01 00:00:03 | |",
"+-------+-----+--------+---------------------+--------+",
];
assert_eq!(pretty_print, expected);
}
_ => unreachable!(),
}
let expected = vec![
"+-------+-----+--------+---------------------+--------+",
"| host | cpu | memory | ts | my_tag |",
"+-------+-----+--------+---------------------+--------+",
"| host1 | 1.1 | 100 | 1970-01-01 00:00:01 | |",
"| host2 | 2.2 | 200 | 1970-01-01 00:00:02 | hello |",
"| host3 | 3.3 | 300 | 1970-01-01 00:00:03 | |",
"+-------+-----+--------+---------------------+--------+",
];
check_output_stream(output, expected).await;
}
async fn test_insert_with_default_value_for_type(type_name: &str) {
let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts("execute_create");
let instance = Instance::with_mock_meta_client(&opts).await.unwrap();
instance.start().await.unwrap();
let create_sql = format!(
r#"create table test_table(
host string,
ts {} DEFAULT CURRENT_TIMESTAMP,
cpu double default 0,
TIME INDEX (ts),
PRIMARY KEY(host)
) engine=mito with(regions=1);"#,
type_name
);
let output = instance.execute_sql(&create_sql).await.unwrap();
assert!(matches!(output, Output::AffectedRows(1)));
// Insert with ts.
instance
.execute_sql("insert into test_table(host, cpu, ts) values ('host1', 1.1, 1000)")
.await
.unwrap();
assert!(matches!(output, Output::AffectedRows(1)));
// Insert without ts, so it should be filled by default value.
let output = instance
.execute_sql("insert into test_table(host, cpu) values ('host2', 2.2)")
.await
.unwrap();
assert!(matches!(output, Output::AffectedRows(1)));
let output = instance
.execute_sql("select host, cpu from test_table")
.await
.unwrap();
let expected = vec![
"+-------+-----+",
"| host | cpu |",
"+-------+-----+",
"| host1 | 1.1 |",
"| host2 | 2.2 |",
"+-------+-----+",
];
check_output_stream(output, expected).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_insert_with_default_value() {
common_telemetry::init_default_ut_logging();
test_insert_with_default_value_for_type("timestamp").await;
test_insert_with_default_value_for_type("bigint").await;
}

View File

@@ -1,14 +1,13 @@
use std::sync::Arc;
use common_time::{util, Timestamp};
use common_time::util;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use crate::data_type::{ConcreteDataType, DataType};
use crate::error::{self, Result};
use crate::scalars::ScalarVector;
use crate::value::Value;
use crate::vectors::{ConstantVector, TimestampVector, VectorRef};
use crate::vectors::{Int64Vector, TimestampVector, VectorRef};
const CURRENT_TIMESTAMP: &str = "current_timestamp()";
@@ -107,15 +106,7 @@ impl ColumnDefaultConstraint {
match &expr[..] {
// TODO(dennis): we only supports current_timestamp right now,
// it's better to use a expression framework in future.
CURRENT_TIMESTAMP => {
// TODO(yingwen): We should coerce the type to the physical type of
// input `data_type`.
let vector =
Arc::new(TimestampVector::from_slice(&[Timestamp::from_millis(
util::current_time_millis(),
)]));
Ok(Arc::new(ConstantVector::new(vector, num_rows)))
}
CURRENT_TIMESTAMP => create_current_timestamp_vector(data_type, num_rows),
_ => error::UnsupportedDefaultExprSnafu { expr }.fail(),
}
}
@@ -143,9 +134,31 @@ impl ColumnDefaultConstraint {
}
}
fn create_current_timestamp_vector(
data_type: &ConcreteDataType,
num_rows: usize,
) -> Result<VectorRef> {
match data_type {
ConcreteDataType::Timestamp(_) => Ok(Arc::new(TimestampVector::from_values(
std::iter::repeat(util::current_time_millis()).take(num_rows),
))),
ConcreteDataType::Int64(_) => Ok(Arc::new(Int64Vector::from_values(
std::iter::repeat(util::current_time_millis()).take(num_rows),
))),
_ => error::DefaultValueTypeSnafu {
reason: format!(
"Not support to assign current timestamp to {:?} type",
data_type
),
}
.fail(),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::error::Error;
use crate::vectors::Int32Vector;
#[test]
@@ -224,6 +237,7 @@ mod tests {
#[test]
fn test_create_default_vector_by_func() {
let constraint = ColumnDefaultConstraint::Function(CURRENT_TIMESTAMP.to_string());
// Timestamp type.
let data_type = ConcreteDataType::timestamp_millis_datatype();
let v = constraint
.create_default_vector(&data_type, false, 4)
@@ -235,10 +249,32 @@ mod tests {
v.get(0)
);
// Int64 type.
let data_type = ConcreteDataType::int64_datatype();
let v = constraint
.create_default_vector(&data_type, false, 4)
.unwrap();
assert_eq!(4, v.len());
assert!(
matches!(v.get(0), Value::Int64(_)),
"v {:?} is not timestamp",
v.get(0)
);
let constraint = ColumnDefaultConstraint::Function("no".to_string());
let data_type = ConcreteDataType::timestamp_millis_datatype();
constraint
.create_default_vector(&data_type, false, 4)
.unwrap_err();
}
#[test]
fn test_create_by_func_and_invalid_type() {
let constraint = ColumnDefaultConstraint::Function(CURRENT_TIMESTAMP.to_string());
let data_type = ConcreteDataType::boolean_datatype();
let err = constraint
.create_default_vector(&data_type, false, 4)
.unwrap_err();
assert!(matches!(err, Error::DefaultValueType { .. }), "{:?}", err);
}
}