diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index c2c4adacda..b8ec95b9be 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -92,6 +92,7 @@ pub enum Error { #[snafu(display("Failed to insert value to table: {}, source: {}", table_name, source))] Insert { table_name: String, + #[snafu(backtrace)] source: TableError, }, diff --git a/src/datanode/src/tests/instance_test.rs b/src/datanode/src/tests/instance_test.rs index 4bbaf9d14d..c542c4cd35 100644 --- a/src/datanode/src/tests/instance_test.rs +++ b/src/datanode/src/tests/instance_test.rs @@ -1,6 +1,8 @@ use arrow::array::{Int64Array, UInt64Array}; use common_query::Output; use common_recordbatch::util; +use datafusion::arrow_print; +use datafusion_common::record_batch::RecordBatch as DfRecordBatch; use datatypes::arrow_array::StringArray; use datatypes::prelude::ConcreteDataType; @@ -240,12 +242,24 @@ pub async fn test_create_table_illegal_timestamp_type() { } } +async fn check_output_stream(output: Output, expected: Vec<&str>) { + match output { + Output::Stream(stream) => { + let recordbatches = util::collect(stream).await.unwrap(); + let recordbatch = recordbatches + .into_iter() + .map(|r| r.df_recordbatch) + .collect::>(); + let pretty_print = arrow_print::write(&recordbatch); + let pretty_print = pretty_print.lines().collect::>(); + assert_eq!(pretty_print, expected); + } + _ => unreachable!(), + } +} + #[tokio::test] async fn test_alter_table() { - use datafusion::arrow_print; - use datafusion_common::record_batch::RecordBatch as DfRecordBatch; - // TODO(LFC) Use real Mito engine when we can alter its region schema, - // and delete the `new_mock` method. let instance = Instance::new_mock().await.unwrap(); instance.start().await.unwrap(); @@ -278,26 +292,69 @@ async fn test_alter_table() { assert!(matches!(output, Output::AffectedRows(1))); let output = instance.execute_sql("select * from demo").await.unwrap(); - match output { - Output::Stream(stream) => { - let recordbatches = util::collect(stream).await.unwrap(); - let recordbatch = recordbatches - .into_iter() - .map(|r| r.df_recordbatch) - .collect::>(); - let pretty_print = arrow_print::write(&recordbatch); - let pretty_print = pretty_print.lines().collect::>(); - let expected = vec![ - "+-------+-----+--------+---------------------+--------+", - "| host | cpu | memory | ts | my_tag |", - "+-------+-----+--------+---------------------+--------+", - "| host1 | 1.1 | 100 | 1970-01-01 00:00:01 | |", - "| host2 | 2.2 | 200 | 1970-01-01 00:00:02 | hello |", - "| host3 | 3.3 | 300 | 1970-01-01 00:00:03 | |", - "+-------+-----+--------+---------------------+--------+", - ]; - assert_eq!(pretty_print, expected); - } - _ => unreachable!(), - } + let expected = vec![ + "+-------+-----+--------+---------------------+--------+", + "| host | cpu | memory | ts | my_tag |", + "+-------+-----+--------+---------------------+--------+", + "| host1 | 1.1 | 100 | 1970-01-01 00:00:01 | |", + "| host2 | 2.2 | 200 | 1970-01-01 00:00:02 | hello |", + "| host3 | 3.3 | 300 | 1970-01-01 00:00:03 | |", + "+-------+-----+--------+---------------------+--------+", + ]; + check_output_stream(output, expected).await; +} + +async fn test_insert_with_default_value_for_type(type_name: &str) { + let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts("execute_create"); + let instance = Instance::with_mock_meta_client(&opts).await.unwrap(); + instance.start().await.unwrap(); + + let create_sql = format!( + r#"create table test_table( + host string, + ts {} DEFAULT CURRENT_TIMESTAMP, + cpu double default 0, + TIME INDEX (ts), + PRIMARY KEY(host) + ) engine=mito with(regions=1);"#, + type_name + ); + let output = instance.execute_sql(&create_sql).await.unwrap(); + assert!(matches!(output, Output::AffectedRows(1))); + + // Insert with ts. + instance + .execute_sql("insert into test_table(host, cpu, ts) values ('host1', 1.1, 1000)") + .await + .unwrap(); + assert!(matches!(output, Output::AffectedRows(1))); + + // Insert without ts, so it should be filled by default value. + let output = instance + .execute_sql("insert into test_table(host, cpu) values ('host2', 2.2)") + .await + .unwrap(); + assert!(matches!(output, Output::AffectedRows(1))); + + let output = instance + .execute_sql("select host, cpu from test_table") + .await + .unwrap(); + let expected = vec![ + "+-------+-----+", + "| host | cpu |", + "+-------+-----+", + "| host1 | 1.1 |", + "| host2 | 2.2 |", + "+-------+-----+", + ]; + check_output_stream(output, expected).await; +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_insert_with_default_value() { + common_telemetry::init_default_ut_logging(); + + test_insert_with_default_value_for_type("timestamp").await; + test_insert_with_default_value_for_type("bigint").await; } diff --git a/src/datatypes/src/schema/constraint.rs b/src/datatypes/src/schema/constraint.rs index 8f54312d12..50d5880339 100644 --- a/src/datatypes/src/schema/constraint.rs +++ b/src/datatypes/src/schema/constraint.rs @@ -1,14 +1,13 @@ use std::sync::Arc; -use common_time::{util, Timestamp}; +use common_time::util; use serde::{Deserialize, Serialize}; use snafu::{ensure, ResultExt}; use crate::data_type::{ConcreteDataType, DataType}; use crate::error::{self, Result}; -use crate::scalars::ScalarVector; use crate::value::Value; -use crate::vectors::{ConstantVector, TimestampVector, VectorRef}; +use crate::vectors::{Int64Vector, TimestampVector, VectorRef}; const CURRENT_TIMESTAMP: &str = "current_timestamp()"; @@ -107,15 +106,7 @@ impl ColumnDefaultConstraint { match &expr[..] { // TODO(dennis): we only supports current_timestamp right now, // it's better to use a expression framework in future. - CURRENT_TIMESTAMP => { - // TODO(yingwen): We should coerce the type to the physical type of - // input `data_type`. - let vector = - Arc::new(TimestampVector::from_slice(&[Timestamp::from_millis( - util::current_time_millis(), - )])); - Ok(Arc::new(ConstantVector::new(vector, num_rows))) - } + CURRENT_TIMESTAMP => create_current_timestamp_vector(data_type, num_rows), _ => error::UnsupportedDefaultExprSnafu { expr }.fail(), } } @@ -143,9 +134,31 @@ impl ColumnDefaultConstraint { } } +fn create_current_timestamp_vector( + data_type: &ConcreteDataType, + num_rows: usize, +) -> Result { + match data_type { + ConcreteDataType::Timestamp(_) => Ok(Arc::new(TimestampVector::from_values( + std::iter::repeat(util::current_time_millis()).take(num_rows), + ))), + ConcreteDataType::Int64(_) => Ok(Arc::new(Int64Vector::from_values( + std::iter::repeat(util::current_time_millis()).take(num_rows), + ))), + _ => error::DefaultValueTypeSnafu { + reason: format!( + "Not support to assign current timestamp to {:?} type", + data_type + ), + } + .fail(), + } +} + #[cfg(test)] mod tests { use super::*; + use crate::error::Error; use crate::vectors::Int32Vector; #[test] @@ -224,6 +237,7 @@ mod tests { #[test] fn test_create_default_vector_by_func() { let constraint = ColumnDefaultConstraint::Function(CURRENT_TIMESTAMP.to_string()); + // Timestamp type. let data_type = ConcreteDataType::timestamp_millis_datatype(); let v = constraint .create_default_vector(&data_type, false, 4) @@ -235,10 +249,32 @@ mod tests { v.get(0) ); + // Int64 type. + let data_type = ConcreteDataType::int64_datatype(); + let v = constraint + .create_default_vector(&data_type, false, 4) + .unwrap(); + assert_eq!(4, v.len()); + assert!( + matches!(v.get(0), Value::Int64(_)), + "v {:?} is not timestamp", + v.get(0) + ); + let constraint = ColumnDefaultConstraint::Function("no".to_string()); let data_type = ConcreteDataType::timestamp_millis_datatype(); constraint .create_default_vector(&data_type, false, 4) .unwrap_err(); } + + #[test] + fn test_create_by_func_and_invalid_type() { + let constraint = ColumnDefaultConstraint::Function(CURRENT_TIMESTAMP.to_string()); + let data_type = ConcreteDataType::boolean_datatype(); + let err = constraint + .create_default_vector(&data_type, false, 4) + .unwrap_err(); + assert!(matches!(err, Error::DefaultValueType { .. }), "{:?}", err); + } }