From f790fa05c16017ad21c10653cdb90306bcb32fac Mon Sep 17 00:00:00 2001 From: LFC Date: Mon, 13 Mar 2023 11:03:51 +0800 Subject: [PATCH] fix: validate insert request (#1142) * fix: validate GRPC insert request has the value when required by column schema, before actually made any change to the DB * fix: resolve PR comments --- src/frontend/src/instance.rs | 104 +++++++++++++++++++++++++++++++++-- 1 file changed, 99 insertions(+), 5 deletions(-) diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 2bc57a1ff6..0f4cde7f4d 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -66,8 +66,8 @@ use sql::statements::statement::Statement; use crate::catalog::FrontendCatalogManager; use crate::datanode::DatanodeClients; use crate::error::{ - self, Error, ExecutePromqlSnafu, ExternalSnafu, MissingMetasrvOptsSnafu, NotSupportedSnafu, - ParseSqlSnafu, Result, SqlExecInterceptedSnafu, + self, Error, ExecutePromqlSnafu, ExternalSnafu, InvalidInsertRequestSnafu, + MissingMetasrvOptsSnafu, NotSupportedSnafu, ParseSqlSnafu, Result, SqlExecInterceptedSnafu, }; use crate::expr_factory::{CreateExprFactoryRef, DefaultCreateExprFactory}; use crate::frontend::FrontendOptions; @@ -231,7 +231,7 @@ impl Instance { } async fn handle_insert(&self, request: InsertRequest, ctx: QueryContextRef) -> Result { - self.create_or_alter_table_on_demand(ctx.clone(), &request.table_name, &request.columns) + self.create_or_alter_table_on_demand(ctx.clone(), &request) .await?; let query = Request::Insert(request); @@ -244,11 +244,12 @@ impl Instance { async fn create_or_alter_table_on_demand( &self, ctx: QueryContextRef, - table_name: &str, - columns: &[Column], + request: &InsertRequest, ) -> Result<()> { let catalog_name = &ctx.current_catalog(); let schema_name = &ctx.current_schema(); + let table_name = &request.table_name; + let columns = &request.columns; let table = self .catalog_manager @@ -271,6 +272,8 @@ impl Instance { Some(table) => { let schema = table.schema(); + validate_insert_request(schema.as_ref(), request)?; + if let Some(add_columns) = common_grpc_expr::find_new_columns(&schema, columns) .context(error::FindNewColumnsOnInsertionSnafu)? { @@ -616,13 +619,39 @@ fn validate_param(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> .context(SqlExecInterceptedSnafu) } +fn validate_insert_request(schema: &Schema, request: &InsertRequest) -> Result<()> { + for column_schema in schema.column_schemas() { + if column_schema.is_nullable() || column_schema.default_constraint().is_some() { + continue; + } + let not_null = request + .columns + .iter() + .find(|x| x.column_name == column_schema.name) + .map(|column| column.null_mask.is_empty() || column.null_mask.iter().all(|x| *x == 0)); + ensure!( + not_null == Some(true), + InvalidInsertRequestSnafu { + reason: format!( + "Expecting insert data to be presented on a not null or no default value column '{}'.", + &column_schema.name + ) + } + ); + } + Ok(()) +} + #[cfg(test)] mod tests { use std::borrow::Cow; use std::collections::HashMap; use std::sync::atomic::AtomicU32; + use api::v1::column::Values; use catalog::helper::{TableGlobalKey, TableGlobalValue}; + use datatypes::prelude::{ConcreteDataType, Value}; + use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema}; use query::query_engine::options::QueryOptions; use session::context::QueryContext; use strfmt::Format; @@ -632,6 +661,71 @@ mod tests { use crate::tests; use crate::tests::MockDistributedInstance; + #[test] + fn test_validate_insert_request() { + let schema = Schema::new(vec![ + ColumnSchema::new("a", ConcreteDataType::int32_datatype(), true) + .with_default_constraint(None) + .unwrap(), + ColumnSchema::new("b", ConcreteDataType::int32_datatype(), true) + .with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::Int32(100)))) + .unwrap(), + ]); + let request = InsertRequest { + columns: vec![Column { + column_name: "c".to_string(), + values: Some(Values { + i32_values: vec![1], + ..Default::default() + }), + null_mask: vec![0], + ..Default::default() + }], + ..Default::default() + }; + // If nullable is true, it doesn't matter whether the insert request has the column. + assert!(validate_insert_request(&schema, &request).is_ok()); + + let schema = Schema::new(vec![ + ColumnSchema::new("a", ConcreteDataType::int32_datatype(), false) + .with_default_constraint(None) + .unwrap(), + ColumnSchema::new("b", ConcreteDataType::int32_datatype(), false) + .with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::Int32(-100)))) + .unwrap(), + ]); + let request = InsertRequest { + columns: vec![Column { + column_name: "a".to_string(), + values: Some(Values { + i32_values: vec![1], + ..Default::default() + }), + null_mask: vec![0], + ..Default::default() + }], + ..Default::default() + }; + // If nullable is false, but the column is defined with default value, + // it also doesn't matter whether the insert request has the column. + assert!(validate_insert_request(&schema, &request).is_ok()); + + let request = InsertRequest { + columns: vec![Column { + column_name: "b".to_string(), + values: Some(Values { + i32_values: vec![1], + ..Default::default() + }), + null_mask: vec![0], + ..Default::default() + }], + ..Default::default() + }; + // Neither of the above cases. + assert!(validate_insert_request(&schema, &request).is_err()); + } + #[test] fn test_exec_validation() { let query_ctx = Arc::new(QueryContext::new());