fix: insert distributed table if partition column has default value (#1498)

* fix: insert distributed table if partition column has default value

* Address review

* address review

* address review

* chore: introduce assert_columns

---------

Co-authored-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Niwaka
2023-05-02 21:50:02 +09:00
committed by GitHub
parent 6aae5b7286
commit d461328238
6 changed files with 223 additions and 46 deletions

View File

@@ -85,7 +85,7 @@ impl Table for DistTable {
let splits = self
.partition_manager
.split_insert_request(&self.table_name, request)
.split_insert_request(&self.table_name, request, &self.schema())
.await
.map_err(BoxedError::new)
.context(TableOperationSnafu)?;

View File

@@ -68,6 +68,20 @@ pub enum Error {
location: Location,
},
#[snafu(display(
"Failed to read column {}, could not create default value, source: {}",
column,
source
))]
CreateDefaultToRead {
column: String,
#[snafu(backtrace)]
source: datatypes::error::Error,
},
#[snafu(display("The column '{}' does not have a default value.", column))]
MissingDefaultValue { column: String },
#[snafu(display("Expect {} region keys, actual {}", expect, actual))]
RegionKeysSize {
expect: usize,
@@ -136,6 +150,8 @@ impl ErrorExt for Error {
Error::InvalidTableRouteData { .. } => StatusCode::Internal,
Error::ConvertScalarValue { .. } => StatusCode::Internal,
Error::FindDatanode { .. } => StatusCode::InvalidArguments,
Error::CreateDefaultToRead { source, .. } => source.status_code(),
Error::MissingDefaultValue { .. } => StatusCode::Internal,
}
}

View File

@@ -18,6 +18,7 @@ use std::sync::Arc;
use common_query::prelude::Expr;
use datafusion_expr::{BinaryExpr, Expr as DfExpr, Operator};
use datatypes::prelude::Value;
use datatypes::schema::Schema;
use meta_client::rpc::{Peer, TableName, TableRoute};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::{RegionId, RegionNumber};
@@ -226,10 +227,11 @@ impl PartitionRuleManager {
&self,
table: &TableName,
req: InsertRequest,
schema: &Schema,
) -> Result<InsertRequestSplit> {
let partition_rule = self.find_table_partition_rule(table).await.unwrap();
let splitter = WriteSplitter::with_partition_rule(partition_rule);
splitter.split_insert(req)
splitter.split_insert(req, schema)
}
}

View File

@@ -16,15 +16,16 @@ use std::collections::HashMap;
use datatypes::data_type::DataType;
use datatypes::prelude::MutableVector;
use datatypes::schema::Schema;
use datatypes::value::Value;
use datatypes::vectors::VectorRef;
use snafu::{ensure, OptionExt};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::RegionNumber;
use table::requests::{DeleteRequest, InsertRequest};
use crate::error::{
FindPartitionColumnSnafu, FindRegionSnafu, InvalidDeleteRequestSnafu,
InvalidInsertRequestSnafu, Result,
CreateDefaultToReadSnafu, FindPartitionColumnSnafu, FindRegionSnafu, InvalidDeleteRequestSnafu,
InvalidInsertRequestSnafu, MissingDefaultValueSnafu, Result,
};
use crate::PartitionRuleRef;
@@ -42,12 +43,37 @@ impl WriteSplitter {
}
}
pub fn split_insert(&self, insert: InsertRequest) -> Result<InsertRequestSplit> {
check_req(&insert)?;
let column_names = self.partition_rule.partition_columns();
let values = &insert.columns_values;
let partition_columns = find_partitioning_values(values, &column_names)?;
pub fn split_insert(
&self,
insert: InsertRequest,
schema: &Schema,
) -> Result<InsertRequestSplit> {
let row_nums = check_req(&insert)?;
let mut insert = insert;
let partition_columns = self.partition_rule.partition_columns();
let missing_columns = schema
.column_schemas()
.iter()
.filter(|schema| {
partition_columns.contains(&schema.name)
&& !insert.columns_values.contains_key(&schema.name)
})
.collect::<Vec<_>>();
for column_schema in missing_columns {
let default_values = column_schema
.create_default_vector(row_nums)
.context(CreateDefaultToReadSnafu {
column: &column_schema.name,
})?
.context(MissingDefaultValueSnafu {
column: &column_schema.name,
})?;
insert
.columns_values
.insert(column_schema.name.clone(), default_values);
}
let partition_columns =
find_partitioning_values(&insert.columns_values, &partition_columns)?;
let region_map = self.split_partitioning_values(&partition_columns)?;
Ok(split_insert_request(&insert, region_map))
@@ -146,7 +172,7 @@ impl WriteSplitter {
}
}
fn check_req(insert: &InsertRequest) -> Result<()> {
fn check_req(insert: &InsertRequest) -> Result<usize> {
let mut len: Option<usize> = None;
for vector in insert.columns_values.values() {
match len {
@@ -159,7 +185,10 @@ fn check_req(insert: &InsertRequest) -> Result<()> {
None => len = Some(vector.len()),
}
}
Ok(())
let len = len.context(InvalidInsertRequestSnafu {
reason: "The columns in the insert statement are empty.",
})?;
Ok(len)
}
fn find_partitioning_values(
@@ -247,10 +276,11 @@ mod tests {
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::ScalarVectorBuilder;
use datatypes::types::StringType;
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, Schema as DataTypesSchema};
use datatypes::types::{BooleanType, Int16Type, StringType};
use datatypes::value::Value;
use datatypes::vectors::{
BooleanVectorBuilder, Int16VectorBuilder, MutableVector, StringVectorBuilder,
BooleanVectorBuilder, Int16VectorBuilder, MutableVector, StringVectorBuilder, Vector,
};
use serde::{Deserialize, Serialize};
use store_api::storage::RegionNumber;
@@ -266,18 +296,36 @@ mod tests {
let right = mock_insert_request();
let ret = check_req(&right);
assert!(ret.is_ok());
assert_eq!(ret.unwrap(), 3);
let wrong = mock_wrong_insert_request();
let ret = check_req(&wrong);
assert!(ret.is_err());
}
fn assert_columns(columns: &HashMap<String, Arc<dyn Vector>>, expected: &[(&str, &[Value])]) {
for (col_name, values) in expected {
for (idx, value) in values.iter().enumerate() {
assert_eq!(*value, columns.get(*col_name).unwrap().get(idx));
}
}
}
#[test]
fn test_writer_spliter() {
let insert = mock_insert_request();
let rule = Arc::new(MockPartitionRule) as PartitionRuleRef;
let spliter = WriteSplitter::with_partition_rule(rule);
let ret = spliter.split_insert(insert).unwrap();
let mock_schema = DataTypesSchema::new(vec![
ColumnSchema::new(
"enable_reboot",
ConcreteDataType::Boolean(BooleanType),
false,
),
ColumnSchema::new("id", ConcreteDataType::Int16(Int16Type {}), false),
ColumnSchema::new("host", ConcreteDataType::String(StringType), true),
]);
let ret = spliter.split_insert(insert, &mock_schema).unwrap();
assert_eq!(2, ret.len());
@@ -289,41 +337,59 @@ mod tests {
let r1_columns = &r1_insert.columns_values;
assert_eq!(3, r1_columns.len());
assert_eq!(
<i16 as Into<Value>>::into(1),
r1_columns.get("id").unwrap().get(0)
);
assert_eq!(
<&str as Into<Value>>::into("host1"),
r1_columns.get("host").unwrap().get(0)
);
assert_eq!(
<bool as Into<Value>>::into(true),
r1_columns.get("enable_reboot").unwrap().get(0)
assert_columns(
r1_columns,
&[
("id", &[Value::from(1_i16)]),
("host", &[Value::from("host1")]),
("enable_reboot", &[Value::from(true)]),
],
);
let r2_columns = &r2_insert.columns_values;
assert_eq!(3, r2_columns.len());
assert_eq!(
<i16 as Into<Value>>::into(2),
r2_columns.get("id").unwrap().get(0)
assert_columns(
r2_columns,
&[
("id", &[Value::from(2_i16), Value::from(3_i16)]),
("host", &[Value::Null, Value::from("host3")]),
("enable_reboot", &[Value::from(false), Value::from(true)]),
],
);
assert_eq!(
<i16 as Into<Value>>::into(3),
r2_columns.get("id").unwrap().get(1)
);
assert_eq!(Value::Null, r2_columns.get("host").unwrap().get(0));
assert_eq!(
<&str as Into<Value>>::into("host3"),
r2_columns.get("host").unwrap().get(1)
);
assert_eq!(
<bool as Into<Value>>::into(false),
r2_columns.get("enable_reboot").unwrap().get(0)
);
assert_eq!(
<bool as Into<Value>>::into(true),
r2_columns.get("enable_reboot").unwrap().get(1)
}
#[test]
fn test_writer_spliter_without_partition_columns() {
let (mock_schema, insert) = mock_schema_and_insert_request_without_partition_columns();
let rule = Arc::new(MockPartitionRule) as PartitionRuleRef;
let spliter = WriteSplitter::with_partition_rule(rule);
let ret = spliter.split_insert(insert, &mock_schema).unwrap();
assert_eq!(1, ret.len());
let r1_insert = ret.get(&0).unwrap();
assert_eq!("demo", r1_insert.table_name);
let r1_columns = &r1_insert.columns_values;
assert_eq!(3, r1_columns.len());
assert_columns(
r1_columns,
&[
(
"id",
&[Value::from(1_i16), Value::from(1_i16), Value::from(1_i16)],
),
(
"host",
&[Value::from("host1"), Value::Null, Value::from("host3")],
),
(
"enable_reboot",
&[Value::from(true), Value::from(false), Value::from(true)],
),
],
);
}
@@ -468,6 +534,45 @@ mod tests {
}
}
fn mock_schema_and_insert_request_without_partition_columns() -> (Schema, InsertRequest) {
let mut columns_values = HashMap::with_capacity(4);
let mut builder = BooleanVectorBuilder::with_capacity(3);
builder.push(Some(true));
builder.push(Some(false));
builder.push(Some(true));
columns_values.insert("enable_reboot".to_string(), builder.to_vector());
let mut builder = StringVectorBuilder::with_capacity(3);
builder.push(Some("host1"));
builder.push(None);
builder.push(Some("host3"));
columns_values.insert("host".to_string(), builder.to_vector());
let insert_request = InsertRequest {
catalog_name: common_catalog::consts::DEFAULT_CATALOG_NAME.to_string(),
schema_name: common_catalog::consts::DEFAULT_SCHEMA_NAME.to_string(),
table_name: "demo".to_string(),
columns_values,
region_number: 0,
};
let id_column = ColumnSchema::new("id", ConcreteDataType::Int16(Int16Type {}), false);
let id_column = id_column
.with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::from(1_i16))))
.unwrap();
let mock_schema = DataTypesSchema::new(vec![
ColumnSchema::new(
"enable_reboot",
ConcreteDataType::Boolean(BooleanType),
false,
),
id_column,
ColumnSchema::new("host", ConcreteDataType::String(StringType), true),
]);
(mock_schema, insert_request)
}
fn mock_wrong_insert_request() -> InsertRequest {
let mut columns_values = HashMap::with_capacity(4);
let mut builder = BooleanVectorBuilder::with_capacity(3);

View File

@@ -58,3 +58,39 @@ DROP TABLE system_metrics;
Affected Rows: 1
create table foo (
host string,
ts timestamp DEFAULT '2023-04-29 00:00:00+00:00',
cpu double default 0,
TIME INDEX (ts),
PRIMARY KEY(host)
) engine=mito with(regions=1);
Affected Rows: 0
insert into foo (host, cpu, ts) values ('host1', 1.1, '2000-01-01 00:00:00+00:00');
Affected Rows: 1
insert into foo (host, cpu) values ('host2', 2.2);
Affected Rows: 1
insert into foo (host) values ('host3');
Affected Rows: 1
select * from foo;
+-------+---------------------+-----+
| host | ts | cpu |
+-------+---------------------+-----+
| host1 | 2000-01-01T00:00:00 | 1.1 |
| host2 | 2023-04-29T00:00:00 | 2.2 |
| host3 | 2023-04-29T00:00:00 | 0.0 |
+-------+---------------------+-----+
DROP TABLE foo;
Affected Rows: 1

View File

@@ -24,3 +24,21 @@ SELECT avg(cpu_util) FROM system_metrics;
SELECT idc, avg(memory_util) FROM system_metrics GROUP BY idc ORDER BY idc;
DROP TABLE system_metrics;
create table foo (
host string,
ts timestamp DEFAULT '2023-04-29 00:00:00+00:00',
cpu double default 0,
TIME INDEX (ts),
PRIMARY KEY(host)
) engine=mito with(regions=1);
insert into foo (host, cpu, ts) values ('host1', 1.1, '2000-01-01 00:00:00+00:00');
insert into foo (host, cpu) values ('host2', 2.2);
insert into foo (host) values ('host3');
select * from foo;
DROP TABLE foo;