feat: support default value when inserting data (#854)

This commit is contained in:
Zheming Li
2023-01-13 14:49:05 +08:00
committed by GitHub
parent e428a84446
commit 0959c1d16b
8 changed files with 224 additions and 52 deletions

View File

@@ -295,6 +295,26 @@ pub enum Error {
#[snafu(display("Cannot find requested database: {}-{}", catalog, schema))]
DatabaseNotFound { catalog: String, schema: String },
#[snafu(display(
"Failed to build default value, column: {}, source: {}",
column,
source
))]
ColumnDefaultValue {
column: String,
#[snafu(backtrace)]
source: datatypes::error::Error,
},
#[snafu(display(
"No valid default value can be build automatically, column: {}",
column,
))]
ColumnNoneDefaultValue {
column: String,
backtrace: Backtrace,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -363,6 +383,8 @@ impl ErrorExt for Error {
Error::BumpTableId { source, .. } => source.status_code(),
Error::MissingNodeId { .. } => StatusCode::InvalidArguments,
Error::MissingMetasrvOpts { .. } => StatusCode::InvalidArguments,
Error::ColumnDefaultValue { source, .. } => source.status_code(),
Error::ColumnNoneDefaultValue { .. } => StatusCode::InvalidArguments,
}
}

View File

@@ -15,7 +15,7 @@
use catalog::CatalogManagerRef;
use common_query::Output;
use datatypes::data_type::DataType;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::vectors::MutableVector;
use snafu::{ensure, OptionExt, ResultExt};
use sql::ast::Value as SqlValue;
@@ -25,11 +25,14 @@ use table::engine::TableReference;
use table::requests::*;
use crate::error::{
CatalogSnafu, ColumnNotFoundSnafu, ColumnValuesNumberMismatchSnafu, InsertSnafu,
ParseSqlValueSnafu, Result, TableNotFoundSnafu,
CatalogSnafu, ColumnDefaultValueSnafu, ColumnNoneDefaultValueSnafu, ColumnNotFoundSnafu,
ColumnValuesNumberMismatchSnafu, InsertSnafu, ParseSqlSnafu, ParseSqlValueSnafu, Result,
TableNotFoundSnafu,
};
use crate::sql::{SqlHandler, SqlRequest};
const DEFAULT_PLACEHOLDER_VALUE: &str = "default";
impl SqlHandler {
pub(crate) async fn insert(&self, req: InsertRequest) -> Result<Output> {
// FIXME(dennis): table_ref is used in InsertSnafu and the req is consumed
@@ -72,17 +75,13 @@ impl SqlHandler {
};
let rows_num = values.len();
let mut columns_builders: Vec<(&String, &ConcreteDataType, Box<dyn MutableVector>)> =
let mut columns_builders: Vec<(&ColumnSchema, Box<dyn MutableVector>)> =
Vec::with_capacity(columns_num);
if columns.is_empty() {
for column_schema in schema.column_schemas() {
let data_type = &column_schema.data_type;
columns_builders.push((
&column_schema.name,
data_type,
data_type.create_mutable_vector(rows_num),
));
columns_builders.push((column_schema, data_type.create_mutable_vector(rows_num)));
}
} else {
for column_name in columns {
@@ -94,11 +93,7 @@ impl SqlHandler {
}
})?;
let data_type = &column_schema.data_type;
columns_builders.push((
column_name,
data_type,
data_type.create_mutable_vector(rows_num),
));
columns_builders.push((column_schema, data_type.create_mutable_vector(rows_num)));
}
}
@@ -112,10 +107,8 @@ impl SqlHandler {
}
);
for (sql_val, (column_name, data_type, builder)) in
row.iter().zip(columns_builders.iter_mut())
{
add_row_to_vector(column_name, data_type, sql_val, builder)?;
for (sql_val, (column_schema, builder)) in row.iter().zip(columns_builders.iter_mut()) {
add_row_to_vector(column_schema, sql_val, builder)?;
}
}
@@ -125,21 +118,34 @@ impl SqlHandler {
table_name: table_ref.table.to_string(),
columns_values: columns_builders
.into_iter()
.map(|(c, _, mut b)| (c.to_owned(), b.to_vector()))
.map(|(cs, mut b)| (cs.name.to_string(), b.to_vector()))
.collect(),
}))
}
}
fn add_row_to_vector(
column_name: &str,
data_type: &ConcreteDataType,
column_schema: &ColumnSchema,
sql_val: &SqlValue,
builder: &mut Box<dyn MutableVector>,
) -> Result<()> {
let value = statements::sql_value_to_value(column_name, data_type, sql_val)
.context(ParseSqlValueSnafu)?;
let value = if replace_default(sql_val) {
column_schema
.create_default()
.context(ColumnDefaultValueSnafu {
column: column_schema.name.to_string(),
})?
.context(ColumnNoneDefaultValueSnafu {
column: column_schema.name.to_string(),
})?
} else {
statements::sql_value_to_value(&column_schema.name, &column_schema.data_type, sql_val)
.context(ParseSqlSnafu)?
};
builder.push_value_ref(value.as_value_ref()).unwrap();
Ok(())
}
fn replace_default(sql_val: &SqlValue) -> bool {
matches!(sql_val, SqlValue::Placeholder(s) if s.to_lowercase() == DEFAULT_PLACEHOLDER_VALUE)
}

View File

@@ -150,6 +150,14 @@ impl ColumnSchema {
}
mutable_vector.to_vector()
}
/// Creates a default value for this column.
///
/// If the column is `NOT NULL` but doesn't has `DEFAULT` value supplied, returns `Ok(None)`.
pub fn create_default(&self) -> Result<Option<Value>> {
self.create_default_vector(1)
.map(|vec_ref_option| vec_ref_option.map(|vec_ref| vec_ref.get(0)))
}
}
impl TryFrom<&Field> for ColumnSchema {
@@ -337,4 +345,34 @@ mod tests {
let expect: VectorRef = Arc::new(Int32Vector::from_slice(&[0, 0, 0, 0]));
assert_eq!(expect, vector);
}
#[test]
fn test_column_schema_single_create_default_null() {
// Implicit default null.
let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true);
let v = column_schema.create_default().unwrap().unwrap();
assert!(v.is_null());
// Explicit default null.
let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true)
.with_default_constraint(Some(ColumnDefaultConstraint::null_value()))
.unwrap();
let v = column_schema.create_default().unwrap().unwrap();
assert!(v.is_null());
}
#[test]
fn test_column_schema_single_create_default_not_null() {
let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true)
.with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::Int32(6))))
.unwrap();
let v = column_schema.create_default().unwrap().unwrap();
assert_eq!(v, Value::Int32(6));
}
#[test]
fn test_column_schema_single_no_default() {
let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), false);
assert!(column_schema.create_default().unwrap().is_none());
}
}

View File

@@ -378,6 +378,26 @@ pub enum Error {
#[snafu(backtrace)]
source: common_grpc_expr::error::Error,
},
#[snafu(display(
"Failed to build default value, column: {}, source: {}",
column,
source
))]
ColumnDefaultValue {
column: String,
#[snafu(backtrace)]
source: datatypes::error::Error,
},
#[snafu(display(
"No valid default value can be build automatically, column: {}",
column,
))]
ColumnNoneDefaultValue {
column: String,
backtrace: Backtrace,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -457,6 +477,8 @@ impl ErrorExt for Error {
Error::EncodeSubstraitLogicalPlan { source } => source.status_code(),
Error::BuildVector { source, .. } => source.status_code(),
Error::InvokeDatanode { source } => source.status_code(),
Error::ColumnDefaultValue { source, .. } => source.status_code(),
Error::ColumnNoneDefaultValue { .. } => StatusCode::InvalidArguments,
}
}

View File

@@ -14,7 +14,8 @@
use common_error::snafu::ensure;
use datatypes::data_type::DataType;
use datatypes::prelude::{ConcreteDataType, MutableVector};
use datatypes::prelude::MutableVector;
use datatypes::schema::ColumnSchema;
use snafu::{OptionExt, ResultExt};
use sql::ast::Value as SqlValue;
use sql::statements;
@@ -22,7 +23,9 @@ use sql::statements::insert::Insert;
use table::requests::InsertRequest;
use table::TableRef;
use crate::error::{self, BuildVectorSnafu, Result};
use crate::error::{self, Result};
const DEFAULT_PLACEHOLDER_VALUE: &str = "default";
// TODO(fys): Extract the common logic in datanode and frontend in the future.
#[allow(dead_code)]
@@ -40,17 +43,13 @@ pub(crate) fn insert_to_request(table: &TableRef, stmt: Insert) -> Result<Insert
};
let rows_num = values.len();
let mut columns_builders: Vec<(&String, &ConcreteDataType, Box<dyn MutableVector>)> =
let mut columns_builders: Vec<(&ColumnSchema, Box<dyn MutableVector>)> =
Vec::with_capacity(columns_num);
if columns.is_empty() {
for column_schema in schema.column_schemas() {
let data_type = &column_schema.data_type;
columns_builders.push((
&column_schema.name,
data_type,
data_type.create_mutable_vector(rows_num),
));
columns_builders.push((column_schema, data_type.create_mutable_vector(rows_num)));
}
} else {
for column_name in columns {
@@ -61,11 +60,7 @@ pub(crate) fn insert_to_request(table: &TableRef, stmt: Insert) -> Result<Insert
}
})?;
let data_type = &column_schema.data_type;
columns_builders.push((
column_name,
data_type,
data_type.create_mutable_vector(rows_num),
));
columns_builders.push((column_schema, data_type.create_mutable_vector(rows_num)));
}
}
@@ -78,10 +73,8 @@ pub(crate) fn insert_to_request(table: &TableRef, stmt: Insert) -> Result<Insert
}
);
for (sql_val, (column_name, data_type, builder)) in
row.iter().zip(columns_builders.iter_mut())
{
add_row_to_vector(column_name, data_type, sql_val, builder)?;
for (sql_val, (column_schema, builder)) in row.iter().zip(columns_builders.iter_mut()) {
add_row_to_vector(column_schema, sql_val, builder)?;
}
}
@@ -91,21 +84,33 @@ pub(crate) fn insert_to_request(table: &TableRef, stmt: Insert) -> Result<Insert
table_name,
columns_values: columns_builders
.into_iter()
.map(|(c, _, mut b)| (c.to_owned(), b.to_vector()))
.map(|(cs, mut b)| (cs.name.to_string(), b.to_vector()))
.collect(),
})
}
fn add_row_to_vector(
column_name: &str,
data_type: &ConcreteDataType,
column_schema: &ColumnSchema,
sql_val: &SqlValue,
builder: &mut Box<dyn MutableVector>,
) -> Result<()> {
let value = statements::sql_value_to_value(column_name, data_type, sql_val)
.context(error::ParseSqlSnafu)?;
builder
.push_value_ref(value.as_value_ref())
.context(BuildVectorSnafu { value })?;
let value = if replace_default(sql_val) {
column_schema
.create_default()
.context(error::ColumnDefaultValueSnafu {
column: column_schema.name.to_string(),
})?
.context(error::ColumnNoneDefaultValueSnafu {
column: column_schema.name.to_string(),
})?
} else {
statements::sql_value_to_value(&column_schema.name, &column_schema.data_type, sql_val)
.context(error::ParseSqlSnafu)?
};
builder.push_value_ref(value.as_value_ref()).unwrap();
Ok(())
}
fn replace_default(sql_val: &SqlValue) -> bool {
matches!(sql_val, SqlValue::Placeholder(s) if s.to_lowercase() == DEFAULT_PLACEHOLDER_VALUE)
}

View File

@@ -131,6 +131,9 @@ pub enum Error {
#[snafu(backtrace)]
source: api::error::Error,
},
#[snafu(display("Invalid sql value: {}", value))]
InvalidSqlValue { value: String, backtrace: Backtrace },
}
impl ErrorExt for Error {
@@ -156,6 +159,7 @@ impl ErrorExt for Error {
UnsupportedAlterTableStatement { .. } => StatusCode::InvalidSyntax,
SerializeColumnDefaultConstraint { source, .. } => source.status_code(),
ConvertToGrpcDataType { source, .. } => source.status_code(),
InvalidSqlValue { .. } => StatusCode::InvalidArguments,
}
}

View File

@@ -39,8 +39,9 @@ use crate::ast::{
Value as SqlValue,
};
use crate::error::{
self, ColumnTypeMismatchSnafu, ConvertToGrpcDataTypeSnafu, ParseSqlValueSnafu, Result,
SerializeColumnDefaultConstraintSnafu, UnsupportedDefaultValueSnafu,
self, ColumnTypeMismatchSnafu, ConvertToGrpcDataTypeSnafu, InvalidSqlValueSnafu,
ParseSqlValueSnafu, Result, SerializeColumnDefaultConstraintSnafu,
UnsupportedDefaultValueSnafu,
};
// TODO(LFC): Get rid of this function, use session context aware version of "table_idents_to_full_name" instead.
@@ -222,6 +223,7 @@ pub fn sql_value_to_value(
parse_string_to_value(column_name, s.to_owned(), data_type)?
}
SqlValue::HexStringLiteral(s) => parse_hex_string(s)?,
SqlValue::Placeholder(s) => return InvalidSqlValueSnafu { value: s }.fail(),
_ => todo!("Other sql value"),
})
}
@@ -720,4 +722,14 @@ mod tests {
assert!(!column_schema.is_nullable());
assert!(!column_schema.is_time_index());
}
#[test]
pub fn test_parse_placeholder_value() {
assert!(sql_value_to_value(
"test",
&ConcreteDataType::string_datatype(),
&SqlValue::Placeholder("default".into())
)
.is_err());
}
}

View File

@@ -66,7 +66,13 @@ fn sql_exprs_to_values(exprs: &Vec<Vec<Expr>>) -> Result<Vec<Vec<Value>>> {
for expr in es.iter() {
vs.push(match expr {
Expr::Value(v) => v.clone(),
Expr::Identifier(ident) => Value::SingleQuotedString(ident.value.clone()),
Expr::Identifier(ident) => {
if ident.quote_style.is_none() {
Value::Placeholder(ident.value.clone())
} else {
Value::SingleQuotedString(ident.value.clone())
}
}
Expr::UnaryOp { op, expr }
if matches!(op, UnaryOperator::Minus | UnaryOperator::Plus) =>
{
@@ -146,4 +152,61 @@ mod tests {
_ => unreachable!(),
}
}
#[test]
fn test_insert_value_with_default() {
use crate::statements::statement::Statement;
// insert "default"
let sql = "INSERT INTO my_table VALUES(default)";
let stmt = ParserContext::create_with_dialect(sql, &GenericDialect {})
.unwrap()
.remove(0);
match stmt {
Statement::Insert(insert) => {
let values = insert.values().unwrap();
assert_eq!(values, vec![vec![Value::Placeholder("default".to_owned())]]);
}
_ => unreachable!(),
}
}
#[test]
fn test_insert_value_with_default_uppercase() {
use crate::statements::statement::Statement;
// insert "DEFAULT"
let sql = "INSERT INTO my_table VALUES(DEFAULT)";
let stmt = ParserContext::create_with_dialect(sql, &GenericDialect {})
.unwrap()
.remove(0);
match stmt {
Statement::Insert(insert) => {
let values = insert.values().unwrap();
assert_eq!(values, vec![vec![Value::Placeholder("DEFAULT".to_owned())]]);
}
_ => unreachable!(),
}
}
#[test]
fn test_insert_value_with_quoted_string() {
use crate::statements::statement::Statement;
// insert "'default'"
let sql = "INSERT INTO my_table VALUES('default')";
let stmt = ParserContext::create_with_dialect(sql, &GenericDialect {})
.unwrap()
.remove(0);
match stmt {
Statement::Insert(insert) => {
let values = insert.values().unwrap();
assert_eq!(
values,
vec![vec![Value::SingleQuotedString("default".to_owned())]]
);
}
_ => unreachable!(),
}
}
}