mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-03 20:02:54 +00:00
fix: validate insert request (#1142)
* fix: validate GRPC insert request has the value when required by column schema, before actually made any change to the DB * fix: resolve PR comments
This commit is contained in:
@@ -66,8 +66,8 @@ use sql::statements::statement::Statement;
|
||||
use crate::catalog::FrontendCatalogManager;
|
||||
use crate::datanode::DatanodeClients;
|
||||
use crate::error::{
|
||||
self, Error, ExecutePromqlSnafu, ExternalSnafu, MissingMetasrvOptsSnafu, NotSupportedSnafu,
|
||||
ParseSqlSnafu, Result, SqlExecInterceptedSnafu,
|
||||
self, Error, ExecutePromqlSnafu, ExternalSnafu, InvalidInsertRequestSnafu,
|
||||
MissingMetasrvOptsSnafu, NotSupportedSnafu, ParseSqlSnafu, Result, SqlExecInterceptedSnafu,
|
||||
};
|
||||
use crate::expr_factory::{CreateExprFactoryRef, DefaultCreateExprFactory};
|
||||
use crate::frontend::FrontendOptions;
|
||||
@@ -231,7 +231,7 @@ impl Instance {
|
||||
}
|
||||
|
||||
async fn handle_insert(&self, request: InsertRequest, ctx: QueryContextRef) -> Result<Output> {
|
||||
self.create_or_alter_table_on_demand(ctx.clone(), &request.table_name, &request.columns)
|
||||
self.create_or_alter_table_on_demand(ctx.clone(), &request)
|
||||
.await?;
|
||||
|
||||
let query = Request::Insert(request);
|
||||
@@ -244,11 +244,12 @@ impl Instance {
|
||||
async fn create_or_alter_table_on_demand(
|
||||
&self,
|
||||
ctx: QueryContextRef,
|
||||
table_name: &str,
|
||||
columns: &[Column],
|
||||
request: &InsertRequest,
|
||||
) -> Result<()> {
|
||||
let catalog_name = &ctx.current_catalog();
|
||||
let schema_name = &ctx.current_schema();
|
||||
let table_name = &request.table_name;
|
||||
let columns = &request.columns;
|
||||
|
||||
let table = self
|
||||
.catalog_manager
|
||||
@@ -271,6 +272,8 @@ impl Instance {
|
||||
Some(table) => {
|
||||
let schema = table.schema();
|
||||
|
||||
validate_insert_request(schema.as_ref(), request)?;
|
||||
|
||||
if let Some(add_columns) = common_grpc_expr::find_new_columns(&schema, columns)
|
||||
.context(error::FindNewColumnsOnInsertionSnafu)?
|
||||
{
|
||||
@@ -616,13 +619,39 @@ fn validate_param(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()>
|
||||
.context(SqlExecInterceptedSnafu)
|
||||
}
|
||||
|
||||
fn validate_insert_request(schema: &Schema, request: &InsertRequest) -> Result<()> {
|
||||
for column_schema in schema.column_schemas() {
|
||||
if column_schema.is_nullable() || column_schema.default_constraint().is_some() {
|
||||
continue;
|
||||
}
|
||||
let not_null = request
|
||||
.columns
|
||||
.iter()
|
||||
.find(|x| x.column_name == column_schema.name)
|
||||
.map(|column| column.null_mask.is_empty() || column.null_mask.iter().all(|x| *x == 0));
|
||||
ensure!(
|
||||
not_null == Some(true),
|
||||
InvalidInsertRequestSnafu {
|
||||
reason: format!(
|
||||
"Expecting insert data to be presented on a not null or no default value column '{}'.",
|
||||
&column_schema.name
|
||||
)
|
||||
}
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::borrow::Cow;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::atomic::AtomicU32;
|
||||
|
||||
use api::v1::column::Values;
|
||||
use catalog::helper::{TableGlobalKey, TableGlobalValue};
|
||||
use datatypes::prelude::{ConcreteDataType, Value};
|
||||
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema};
|
||||
use query::query_engine::options::QueryOptions;
|
||||
use session::context::QueryContext;
|
||||
use strfmt::Format;
|
||||
@@ -632,6 +661,71 @@ mod tests {
|
||||
use crate::tests;
|
||||
use crate::tests::MockDistributedInstance;
|
||||
|
||||
#[test]
|
||||
fn test_validate_insert_request() {
|
||||
let schema = Schema::new(vec![
|
||||
ColumnSchema::new("a", ConcreteDataType::int32_datatype(), true)
|
||||
.with_default_constraint(None)
|
||||
.unwrap(),
|
||||
ColumnSchema::new("b", ConcreteDataType::int32_datatype(), true)
|
||||
.with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::Int32(100))))
|
||||
.unwrap(),
|
||||
]);
|
||||
let request = InsertRequest {
|
||||
columns: vec![Column {
|
||||
column_name: "c".to_string(),
|
||||
values: Some(Values {
|
||||
i32_values: vec![1],
|
||||
..Default::default()
|
||||
}),
|
||||
null_mask: vec![0],
|
||||
..Default::default()
|
||||
}],
|
||||
..Default::default()
|
||||
};
|
||||
// If nullable is true, it doesn't matter whether the insert request has the column.
|
||||
assert!(validate_insert_request(&schema, &request).is_ok());
|
||||
|
||||
let schema = Schema::new(vec![
|
||||
ColumnSchema::new("a", ConcreteDataType::int32_datatype(), false)
|
||||
.with_default_constraint(None)
|
||||
.unwrap(),
|
||||
ColumnSchema::new("b", ConcreteDataType::int32_datatype(), false)
|
||||
.with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::Int32(-100))))
|
||||
.unwrap(),
|
||||
]);
|
||||
let request = InsertRequest {
|
||||
columns: vec![Column {
|
||||
column_name: "a".to_string(),
|
||||
values: Some(Values {
|
||||
i32_values: vec![1],
|
||||
..Default::default()
|
||||
}),
|
||||
null_mask: vec![0],
|
||||
..Default::default()
|
||||
}],
|
||||
..Default::default()
|
||||
};
|
||||
// If nullable is false, but the column is defined with default value,
|
||||
// it also doesn't matter whether the insert request has the column.
|
||||
assert!(validate_insert_request(&schema, &request).is_ok());
|
||||
|
||||
let request = InsertRequest {
|
||||
columns: vec![Column {
|
||||
column_name: "b".to_string(),
|
||||
values: Some(Values {
|
||||
i32_values: vec![1],
|
||||
..Default::default()
|
||||
}),
|
||||
null_mask: vec![0],
|
||||
..Default::default()
|
||||
}],
|
||||
..Default::default()
|
||||
};
|
||||
// Neither of the above cases.
|
||||
assert!(validate_insert_request(&schema, &request).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_exec_validation() {
|
||||
let query_ctx = Arc::new(QueryContext::new());
|
||||
|
||||
Reference in New Issue
Block a user