From d46132823837452d5e8e5be7ac9aef292c970bbf Mon Sep 17 00:00:00 2001 From: Niwaka <61189782+NiwakaDev@users.noreply.github.com> Date: Tue, 2 May 2023 21:50:02 +0900 Subject: [PATCH] fix: insert distributed table if partition column has default value (#1498) * fix: insert distributed table if partition column has default value * Address review * address review * address review * chore: introduce assert_columns --------- Co-authored-by: WenyXu --- src/frontend/src/table.rs | 2 +- src/partition/src/error.rs | 16 ++ src/partition/src/manager.rs | 4 +- src/partition/src/splitter.rs | 193 ++++++++++++++++----- tests/cases/standalone/common/basic.result | 36 ++++ tests/cases/standalone/common/basic.sql | 18 ++ 6 files changed, 223 insertions(+), 46 deletions(-) diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index 293714152b..89565a07bd 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -85,7 +85,7 @@ impl Table for DistTable { let splits = self .partition_manager - .split_insert_request(&self.table_name, request) + .split_insert_request(&self.table_name, request, &self.schema()) .await .map_err(BoxedError::new) .context(TableOperationSnafu)?; diff --git a/src/partition/src/error.rs b/src/partition/src/error.rs index 5ffcd6a001..95fb613950 100644 --- a/src/partition/src/error.rs +++ b/src/partition/src/error.rs @@ -68,6 +68,20 @@ pub enum Error { location: Location, }, + #[snafu(display( + "Failed to read column {}, could not create default value, source: {}", + column, + source + ))] + CreateDefaultToRead { + column: String, + #[snafu(backtrace)] + source: datatypes::error::Error, + }, + + #[snafu(display("The column '{}' does not have a default value.", column))] + MissingDefaultValue { column: String }, + #[snafu(display("Expect {} region keys, actual {}", expect, actual))] RegionKeysSize { expect: usize, @@ -136,6 +150,8 @@ impl ErrorExt for Error { Error::InvalidTableRouteData { .. } => StatusCode::Internal, Error::ConvertScalarValue { .. } => StatusCode::Internal, Error::FindDatanode { .. } => StatusCode::InvalidArguments, + Error::CreateDefaultToRead { source, .. } => source.status_code(), + Error::MissingDefaultValue { .. } => StatusCode::Internal, } } diff --git a/src/partition/src/manager.rs b/src/partition/src/manager.rs index 9e8b0b4446..3e3fcd64c2 100644 --- a/src/partition/src/manager.rs +++ b/src/partition/src/manager.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use common_query::prelude::Expr; use datafusion_expr::{BinaryExpr, Expr as DfExpr, Operator}; use datatypes::prelude::Value; +use datatypes::schema::Schema; use meta_client::rpc::{Peer, TableName, TableRoute}; use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::{RegionId, RegionNumber}; @@ -226,10 +227,11 @@ impl PartitionRuleManager { &self, table: &TableName, req: InsertRequest, + schema: &Schema, ) -> Result { let partition_rule = self.find_table_partition_rule(table).await.unwrap(); let splitter = WriteSplitter::with_partition_rule(partition_rule); - splitter.split_insert(req) + splitter.split_insert(req, schema) } } diff --git a/src/partition/src/splitter.rs b/src/partition/src/splitter.rs index eed4b4af7b..3634d57546 100644 --- a/src/partition/src/splitter.rs +++ b/src/partition/src/splitter.rs @@ -16,15 +16,16 @@ use std::collections::HashMap; use datatypes::data_type::DataType; use datatypes::prelude::MutableVector; +use datatypes::schema::Schema; use datatypes::value::Value; use datatypes::vectors::VectorRef; -use snafu::{ensure, OptionExt}; +use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::RegionNumber; use table::requests::{DeleteRequest, InsertRequest}; use crate::error::{ - FindPartitionColumnSnafu, FindRegionSnafu, InvalidDeleteRequestSnafu, - InvalidInsertRequestSnafu, Result, + CreateDefaultToReadSnafu, FindPartitionColumnSnafu, FindRegionSnafu, InvalidDeleteRequestSnafu, + InvalidInsertRequestSnafu, MissingDefaultValueSnafu, Result, }; use crate::PartitionRuleRef; @@ -42,12 +43,37 @@ impl WriteSplitter { } } - pub fn split_insert(&self, insert: InsertRequest) -> Result { - check_req(&insert)?; - - let column_names = self.partition_rule.partition_columns(); - let values = &insert.columns_values; - let partition_columns = find_partitioning_values(values, &column_names)?; + pub fn split_insert( + &self, + insert: InsertRequest, + schema: &Schema, + ) -> Result { + let row_nums = check_req(&insert)?; + let mut insert = insert; + let partition_columns = self.partition_rule.partition_columns(); + let missing_columns = schema + .column_schemas() + .iter() + .filter(|schema| { + partition_columns.contains(&schema.name) + && !insert.columns_values.contains_key(&schema.name) + }) + .collect::>(); + for column_schema in missing_columns { + let default_values = column_schema + .create_default_vector(row_nums) + .context(CreateDefaultToReadSnafu { + column: &column_schema.name, + })? + .context(MissingDefaultValueSnafu { + column: &column_schema.name, + })?; + insert + .columns_values + .insert(column_schema.name.clone(), default_values); + } + let partition_columns = + find_partitioning_values(&insert.columns_values, &partition_columns)?; let region_map = self.split_partitioning_values(&partition_columns)?; Ok(split_insert_request(&insert, region_map)) @@ -146,7 +172,7 @@ impl WriteSplitter { } } -fn check_req(insert: &InsertRequest) -> Result<()> { +fn check_req(insert: &InsertRequest) -> Result { let mut len: Option = None; for vector in insert.columns_values.values() { match len { @@ -159,7 +185,10 @@ fn check_req(insert: &InsertRequest) -> Result<()> { None => len = Some(vector.len()), } } - Ok(()) + let len = len.context(InvalidInsertRequestSnafu { + reason: "The columns in the insert statement are empty.", + })?; + Ok(len) } fn find_partitioning_values( @@ -247,10 +276,11 @@ mod tests { use datatypes::data_type::ConcreteDataType; use datatypes::prelude::ScalarVectorBuilder; - use datatypes::types::StringType; + use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, Schema as DataTypesSchema}; + use datatypes::types::{BooleanType, Int16Type, StringType}; use datatypes::value::Value; use datatypes::vectors::{ - BooleanVectorBuilder, Int16VectorBuilder, MutableVector, StringVectorBuilder, + BooleanVectorBuilder, Int16VectorBuilder, MutableVector, StringVectorBuilder, Vector, }; use serde::{Deserialize, Serialize}; use store_api::storage::RegionNumber; @@ -266,18 +296,36 @@ mod tests { let right = mock_insert_request(); let ret = check_req(&right); assert!(ret.is_ok()); + assert_eq!(ret.unwrap(), 3); let wrong = mock_wrong_insert_request(); let ret = check_req(&wrong); assert!(ret.is_err()); } + fn assert_columns(columns: &HashMap>, expected: &[(&str, &[Value])]) { + for (col_name, values) in expected { + for (idx, value) in values.iter().enumerate() { + assert_eq!(*value, columns.get(*col_name).unwrap().get(idx)); + } + } + } + #[test] fn test_writer_spliter() { let insert = mock_insert_request(); let rule = Arc::new(MockPartitionRule) as PartitionRuleRef; let spliter = WriteSplitter::with_partition_rule(rule); - let ret = spliter.split_insert(insert).unwrap(); + let mock_schema = DataTypesSchema::new(vec![ + ColumnSchema::new( + "enable_reboot", + ConcreteDataType::Boolean(BooleanType), + false, + ), + ColumnSchema::new("id", ConcreteDataType::Int16(Int16Type {}), false), + ColumnSchema::new("host", ConcreteDataType::String(StringType), true), + ]); + let ret = spliter.split_insert(insert, &mock_schema).unwrap(); assert_eq!(2, ret.len()); @@ -289,41 +337,59 @@ mod tests { let r1_columns = &r1_insert.columns_values; assert_eq!(3, r1_columns.len()); - assert_eq!( - >::into(1), - r1_columns.get("id").unwrap().get(0) - ); - assert_eq!( - <&str as Into>::into("host1"), - r1_columns.get("host").unwrap().get(0) - ); - assert_eq!( - >::into(true), - r1_columns.get("enable_reboot").unwrap().get(0) + assert_columns( + r1_columns, + &[ + ("id", &[Value::from(1_i16)]), + ("host", &[Value::from("host1")]), + ("enable_reboot", &[Value::from(true)]), + ], ); let r2_columns = &r2_insert.columns_values; assert_eq!(3, r2_columns.len()); - assert_eq!( - >::into(2), - r2_columns.get("id").unwrap().get(0) + + assert_columns( + r2_columns, + &[ + ("id", &[Value::from(2_i16), Value::from(3_i16)]), + ("host", &[Value::Null, Value::from("host3")]), + ("enable_reboot", &[Value::from(false), Value::from(true)]), + ], ); - assert_eq!( - >::into(3), - r2_columns.get("id").unwrap().get(1) - ); - assert_eq!(Value::Null, r2_columns.get("host").unwrap().get(0)); - assert_eq!( - <&str as Into>::into("host3"), - r2_columns.get("host").unwrap().get(1) - ); - assert_eq!( - >::into(false), - r2_columns.get("enable_reboot").unwrap().get(0) - ); - assert_eq!( - >::into(true), - r2_columns.get("enable_reboot").unwrap().get(1) + } + + #[test] + fn test_writer_spliter_without_partition_columns() { + let (mock_schema, insert) = mock_schema_and_insert_request_without_partition_columns(); + let rule = Arc::new(MockPartitionRule) as PartitionRuleRef; + let spliter = WriteSplitter::with_partition_rule(rule); + let ret = spliter.split_insert(insert, &mock_schema).unwrap(); + + assert_eq!(1, ret.len()); + + let r1_insert = ret.get(&0).unwrap(); + + assert_eq!("demo", r1_insert.table_name); + + let r1_columns = &r1_insert.columns_values; + assert_eq!(3, r1_columns.len()); + assert_columns( + r1_columns, + &[ + ( + "id", + &[Value::from(1_i16), Value::from(1_i16), Value::from(1_i16)], + ), + ( + "host", + &[Value::from("host1"), Value::Null, Value::from("host3")], + ), + ( + "enable_reboot", + &[Value::from(true), Value::from(false), Value::from(true)], + ), + ], ); } @@ -468,6 +534,45 @@ mod tests { } } + fn mock_schema_and_insert_request_without_partition_columns() -> (Schema, InsertRequest) { + let mut columns_values = HashMap::with_capacity(4); + let mut builder = BooleanVectorBuilder::with_capacity(3); + builder.push(Some(true)); + builder.push(Some(false)); + builder.push(Some(true)); + columns_values.insert("enable_reboot".to_string(), builder.to_vector()); + + let mut builder = StringVectorBuilder::with_capacity(3); + builder.push(Some("host1")); + builder.push(None); + builder.push(Some("host3")); + columns_values.insert("host".to_string(), builder.to_vector()); + + let insert_request = InsertRequest { + catalog_name: common_catalog::consts::DEFAULT_CATALOG_NAME.to_string(), + schema_name: common_catalog::consts::DEFAULT_SCHEMA_NAME.to_string(), + table_name: "demo".to_string(), + columns_values, + region_number: 0, + }; + + let id_column = ColumnSchema::new("id", ConcreteDataType::Int16(Int16Type {}), false); + let id_column = id_column + .with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::from(1_i16)))) + .unwrap(); + let mock_schema = DataTypesSchema::new(vec![ + ColumnSchema::new( + "enable_reboot", + ConcreteDataType::Boolean(BooleanType), + false, + ), + id_column, + ColumnSchema::new("host", ConcreteDataType::String(StringType), true), + ]); + + (mock_schema, insert_request) + } + fn mock_wrong_insert_request() -> InsertRequest { let mut columns_values = HashMap::with_capacity(4); let mut builder = BooleanVectorBuilder::with_capacity(3); diff --git a/tests/cases/standalone/common/basic.result b/tests/cases/standalone/common/basic.result index 2d9446b4fb..19ffd297ec 100644 --- a/tests/cases/standalone/common/basic.result +++ b/tests/cases/standalone/common/basic.result @@ -58,3 +58,39 @@ DROP TABLE system_metrics; Affected Rows: 1 +create table foo ( + host string, + ts timestamp DEFAULT '2023-04-29 00:00:00+00:00', + cpu double default 0, + TIME INDEX (ts), + PRIMARY KEY(host) +) engine=mito with(regions=1); + +Affected Rows: 0 + +insert into foo (host, cpu, ts) values ('host1', 1.1, '2000-01-01 00:00:00+00:00'); + +Affected Rows: 1 + +insert into foo (host, cpu) values ('host2', 2.2); + +Affected Rows: 1 + +insert into foo (host) values ('host3'); + +Affected Rows: 1 + +select * from foo; + ++-------+---------------------+-----+ +| host | ts | cpu | ++-------+---------------------+-----+ +| host1 | 2000-01-01T00:00:00 | 1.1 | +| host2 | 2023-04-29T00:00:00 | 2.2 | +| host3 | 2023-04-29T00:00:00 | 0.0 | ++-------+---------------------+-----+ + +DROP TABLE foo; + +Affected Rows: 1 + diff --git a/tests/cases/standalone/common/basic.sql b/tests/cases/standalone/common/basic.sql index d08a78a6a6..d425b66343 100644 --- a/tests/cases/standalone/common/basic.sql +++ b/tests/cases/standalone/common/basic.sql @@ -24,3 +24,21 @@ SELECT avg(cpu_util) FROM system_metrics; SELECT idc, avg(memory_util) FROM system_metrics GROUP BY idc ORDER BY idc; DROP TABLE system_metrics; + +create table foo ( + host string, + ts timestamp DEFAULT '2023-04-29 00:00:00+00:00', + cpu double default 0, + TIME INDEX (ts), + PRIMARY KEY(host) +) engine=mito with(regions=1); + +insert into foo (host, cpu, ts) values ('host1', 1.1, '2000-01-01 00:00:00+00:00'); + +insert into foo (host, cpu) values ('host2', 2.2); + +insert into foo (host) values ('host3'); + +select * from foo; + +DROP TABLE foo; \ No newline at end of file