diff --git a/src/frontend/src/inserter.rs b/src/frontend/src/inserter.rs new file mode 100644 index 0000000000..078a82984a --- /dev/null +++ b/src/frontend/src/inserter.rs @@ -0,0 +1,554 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::helper::ColumnDataTypeWrapper; +use api::v1::alter_expr::Kind; +use api::v1::ddl_request::Expr as DdlExpr; +use api::v1::greptime_request::Request; +use api::v1::value::ValueData; +use api::v1::{ + AlterExpr, Column, ColumnDataType, ColumnSchema, DdlRequest, InsertRequest, InsertRequests, + Row, RowInsertRequest, RowInsertRequests, Rows, Value, +}; +use catalog::CatalogManagerRef; +use common_base::BitVec; +use common_catalog::consts::default_engine; +use common_grpc_expr::util::{extract_new_columns, ColumnExpr}; +use common_query::Output; +use common_telemetry::info; +use datatypes::schema::Schema; +use servers::query_handler::grpc::GrpcQueryHandlerRef; +use session::context::QueryContextRef; +use snafu::prelude::*; +use table::engine::TableReference; +use table::TableRef; + +use crate::error::{ + CatalogSnafu, ColumnDataTypeSnafu, EmptyDataSnafu, Error, FindNewColumnsOnInsertionSnafu, + InvalidInsertRequestSnafu, Result, +}; +use crate::expr_factory::CreateExprFactory; + +pub(crate) struct Inserter<'a> { + catalog_manager: &'a CatalogManagerRef, + create_expr_factory: &'a CreateExprFactory, + grpc_query_handler: &'a GrpcQueryHandlerRef, +} + +impl<'a> Inserter<'a> { + pub fn new( + catalog_manager: &'a CatalogManagerRef, + create_expr_factory: &'a CreateExprFactory, + grpc_query_handler: &'a GrpcQueryHandlerRef, + ) -> Self { + Self { + catalog_manager, + create_expr_factory, + grpc_query_handler, + } + } + + pub async fn handle_column_inserts( + &self, + requests: InsertRequests, + ctx: QueryContextRef, + ) -> Result { + let row_inserts = requests_column_to_row(requests)?; + self.handle_row_inserts(row_inserts, ctx).await + } + + pub async fn handle_row_inserts( + &self, + requests: RowInsertRequests, + ctx: QueryContextRef, + ) -> Result { + requests.inserts.iter().try_for_each(|req| { + let non_empty = req.rows.as_ref().map(|r| !r.rows.is_empty()); + let non_empty = non_empty.unwrap_or_default(); + non_empty.then_some(()).with_context(|| EmptyDataSnafu { + msg: format!("insert to table: {:?}", &req.table_name), + }) + })?; + + self.create_or_alter_tables_on_demand(&requests, ctx.clone()) + .await?; + let query = Request::RowInserts(requests); + self.grpc_query_handler.do_query(query, ctx).await + } +} + +impl<'a> Inserter<'a> { + // check if tables already exist: + // - if table does not exist, create table by inferred CreateExpr + // - if table exist, check if schema matches. If any new column found, alter table by inferred `AlterExpr` + async fn create_or_alter_tables_on_demand( + &self, + requests: &RowInsertRequests, + ctx: QueryContextRef, + ) -> Result<()> { + // TODO(jeremy): create and alter in batch? + for req in &requests.inserts { + match self.get_table(req, &ctx).await? { + Some(table) => { + validate_request_with_table(req, &table)?; + self.alter_table_on_demand(req, table, &ctx).await? + } + None => self.create_table(req, &ctx).await?, + } + } + + Ok(()) + } + + async fn get_table( + &self, + req: &RowInsertRequest, + ctx: &QueryContextRef, + ) -> Result> { + self.catalog_manager + .table(ctx.current_catalog(), ctx.current_schema(), &req.table_name) + .await + .context(CatalogSnafu) + } + + async fn alter_table_on_demand( + &self, + req: &RowInsertRequest, + table: TableRef, + ctx: &QueryContextRef, + ) -> Result<()> { + let catalog_name = ctx.current_catalog(); + let schema_name = ctx.current_schema(); + + let request_schema = req.rows.as_ref().unwrap().schema.as_slice(); + let column_exprs = ColumnExpr::from_column_schemas(request_schema); + let add_columns = extract_new_columns(&table.schema(), column_exprs) + .context(FindNewColumnsOnInsertionSnafu)?; + let Some(add_columns) = add_columns else { + return Ok(()); + }; + + let table_name = table.table_info().name.clone(); + info!( + "Adding new columns: {:?} to table: {}.{}.{}", + add_columns, catalog_name, schema_name, table_name + ); + + let alter_table_expr = AlterExpr { + catalog_name: catalog_name.to_string(), + schema_name: schema_name.to_string(), + table_name: table_name.to_string(), + kind: Some(Kind::AddColumns(add_columns)), + ..Default::default() + }; + + let req = Request::Ddl(DdlRequest { + expr: Some(DdlExpr::Alter(alter_table_expr)), + }); + self.grpc_query_handler.do_query(req, ctx.clone()).await?; + + info!( + "Successfully added new columns to table: {}.{}.{}", + catalog_name, schema_name, table_name + ); + + Ok(()) + } + + async fn create_table(&self, req: &RowInsertRequest, ctx: &QueryContextRef) -> Result<()> { + let table_ref = + TableReference::full(ctx.current_catalog(), ctx.current_schema(), &req.table_name); + let request_schema = req.rows.as_ref().unwrap().schema.as_slice(); + + info!( + "Table {}.{}.{} does not exist, try create table", + table_ref.catalog, table_ref.schema, table_ref.table, + ); + + let create_table_expr = self + .create_expr_factory + .create_table_expr_by_column_schemas(&table_ref, request_schema, default_engine())?; + + let req = Request::Ddl(DdlRequest { + expr: Some(DdlExpr::CreateTable(create_table_expr)), + }); + self.grpc_query_handler.do_query(req, ctx.clone()).await?; + + info!( + "Successfully created table on insertion: {}.{}.{}", + table_ref.catalog, table_ref.schema, table_ref.table, + ); + + Ok(()) + } +} + +fn requests_column_to_row(requests: InsertRequests) -> Result { + requests + .inserts + .into_iter() + .map(request_column_to_row) + .collect::>>() + .map(|inserts| RowInsertRequests { inserts }) +} + +fn request_column_to_row(request: InsertRequest) -> Result { + let row_count = request.row_count as usize; + let column_count = request.columns.len(); + let mut schema = Vec::with_capacity(column_count); + let mut rows = vec![ + Row { + values: Vec::with_capacity(column_count) + }; + row_count + ]; + for column in request.columns { + let column_schema = ColumnSchema { + column_name: column.column_name.clone(), + datatype: column.datatype, + semantic_type: column.semantic_type, + }; + schema.push(column_schema); + + push_column_to_rows(column, &mut rows)?; + } + + Ok(RowInsertRequest { + table_name: request.table_name, + rows: Some(Rows { schema, rows }), + region_number: request.region_number, + }) +} + +fn push_column_to_rows(column: Column, rows: &mut [Row]) -> Result<()> { + let null_mask = BitVec::from_vec(column.null_mask); + let column_type = ColumnDataTypeWrapper::try_new(column.datatype) + .context(ColumnDataTypeSnafu)? + .datatype(); + let column_values = column.values.unwrap_or_default(); + + macro_rules! push_column_values_match_types { + ($( ($arm:tt, $value_data_variant:tt, $field_name:tt), )*) => { match column_type { $( + + ColumnDataType::$arm => { + let row_count = rows.len(); + let actual_row_count = null_mask.count_ones() + column_values.$field_name.len(); + ensure!( + actual_row_count == row_count, + InvalidInsertRequestSnafu { + reason: format!( + "Expecting {} rows of data for column '{}', but got {}.", + row_count, column.column_name, actual_row_count + ), + } + ); + + let mut null_mask_iter = null_mask.into_iter(); + let mut values_iter = column_values.$field_name.into_iter(); + + for row in rows { + let value_is_null = null_mask_iter.next(); + if value_is_null == Some(true) { + row.values.push(Value { value_data: None }); + } else { + // previous check ensures that there is a value for each row + let value = values_iter.next().unwrap(); + row.values.push(Value { + value_data: Some(ValueData::$value_data_variant(value)), + }); + } + } + } + + )* }} + } + + push_column_values_match_types!( + (Boolean, BoolValue, bool_values), + (Int8, I8Value, i8_values), + (Int16, I16Value, i16_values), + (Int32, I32Value, i32_values), + (Int64, I64Value, i64_values), + (Uint8, U8Value, u8_values), + (Uint16, U16Value, u16_values), + (Uint32, U32Value, u32_values), + (Uint64, U64Value, u64_values), + (Float32, F32Value, f32_values), + (Float64, F64Value, f64_values), + (Binary, BinaryValue, binary_values), + (String, StringValue, string_values), + (Date, DateValue, date_values), + (Datetime, DatetimeValue, datetime_values), + (TimestampSecond, TsSecondValue, ts_second_values), + ( + TimestampMillisecond, + TsMillisecondValue, + ts_millisecond_values + ), + ( + TimestampMicrosecond, + TsMicrosecondValue, + ts_microsecond_values + ), + (TimestampNanosecond, TsNanosecondValue, ts_nanosecond_values), + (TimeSecond, TimeSecondValue, time_second_values), + ( + TimeMillisecond, + TimeMillisecondValue, + time_millisecond_values + ), + ( + TimeMicrosecond, + TimeMicrosecondValue, + time_microsecond_values + ), + (TimeNanosecond, TimeNanosecondValue, time_nanosecond_values), + ( + IntervalYearMonth, + IntervalYearMonthValues, + interval_year_month_values + ), + ( + IntervalDayTime, + IntervalDayTimeValues, + interval_day_time_values + ), + ( + IntervalMonthDayNano, + IntervalMonthDayNanoValues, + interval_month_day_nano_values + ), + ); + + Ok(()) +} + +fn validate_request_with_table(req: &RowInsertRequest, table: &TableRef) -> Result<()> { + let request_schema = req.rows.as_ref().unwrap().schema.as_slice(); + let table_schema = table.schema(); + + validate_required_columns(request_schema, &table_schema)?; + + Ok(()) +} + +fn validate_required_columns(request_schema: &[ColumnSchema], table_schema: &Schema) -> Result<()> { + for column_schema in table_schema.column_schemas() { + if column_schema.is_nullable() || column_schema.default_constraint().is_some() { + continue; + } + if !request_schema + .iter() + .any(|c| c.column_name == column_schema.name) + { + return InvalidInsertRequestSnafu { + reason: format!( + "Expecting insert data to be presented on a not null or no default value column '{}'.", + &column_schema.name + ) + }.fail(); + } + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use api::v1::column::Values; + use api::v1::SemanticType; + use common_base::bit_vec::prelude::*; + use datatypes::prelude::{ConcreteDataType, Value as DtValue}; + use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema as DtColumnSchema}; + + use super::*; + + #[test] + fn test_validate_required_columns() { + let schema = Schema::new(vec![ + DtColumnSchema::new("a", ConcreteDataType::int32_datatype(), true) + .with_default_constraint(None) + .unwrap(), + DtColumnSchema::new("b", ConcreteDataType::int32_datatype(), true) + .with_default_constraint(Some(ColumnDefaultConstraint::Value(DtValue::Int32(100)))) + .unwrap(), + ]); + let request_schema = &[ColumnSchema { + column_name: "c".to_string(), + ..Default::default() + }]; + // If nullable is true, it doesn't matter whether the insert request has the column. + validate_required_columns(request_schema, &schema).unwrap(); + + let schema = Schema::new(vec![ + DtColumnSchema::new("a", ConcreteDataType::int32_datatype(), false) + .with_default_constraint(None) + .unwrap(), + DtColumnSchema::new("b", ConcreteDataType::int32_datatype(), false) + .with_default_constraint(Some(ColumnDefaultConstraint::Value(DtValue::Int32(-100)))) + .unwrap(), + ]); + let request_schema = &[ColumnSchema { + column_name: "a".to_string(), + ..Default::default() + }]; + // If nullable is false, but the column is defined with default value, + // it also doesn't matter whether the insert request has the column. + validate_required_columns(request_schema, &schema).unwrap(); + + let request_schema = &[ColumnSchema { + column_name: "b".to_string(), + ..Default::default() + }]; + // Neither of the above cases. + assert!(validate_required_columns(request_schema, &schema).is_err()); + } + + #[test] + fn test_request_column_to_row() { + let insert_request = InsertRequest { + table_name: String::from("test_table"), + row_count: 3, + region_number: 1, + columns: vec![ + Column { + column_name: String::from("col1"), + datatype: ColumnDataType::Int32.into(), + semantic_type: SemanticType::Field.into(), + null_mask: bitvec![u8, Lsb0; 1, 0, 1].into_vec(), + values: Some(Values { + i32_values: vec![42], + ..Default::default() + }), + }, + Column { + column_name: String::from("col2"), + datatype: ColumnDataType::String.into(), + semantic_type: SemanticType::Tag.into(), + null_mask: vec![], + values: Some(Values { + string_values: vec![ + String::from("value1"), + String::from("value2"), + String::from("value3"), + ], + ..Default::default() + }), + }, + ], + }; + + let result = request_column_to_row(insert_request); + let row_insert_request = result.unwrap(); + assert_eq!(row_insert_request.table_name, "test_table"); + assert_eq!(row_insert_request.region_number, 1); + let rows = row_insert_request.rows.unwrap(); + assert_eq!( + rows.schema, + vec![ + ColumnSchema { + column_name: String::from("col1"), + datatype: ColumnDataType::Int32.into(), + semantic_type: SemanticType::Field.into(), + }, + ColumnSchema { + column_name: String::from("col2"), + datatype: ColumnDataType::String.into(), + semantic_type: SemanticType::Tag.into(), + }, + ] + ); + assert_eq!( + rows.rows, + vec![ + Row { + values: vec![ + Value { value_data: None }, + Value { + value_data: Some(ValueData::StringValue(String::from("value1"))), + }, + ], + }, + Row { + values: vec![ + Value { + value_data: Some(ValueData::I32Value(42)), + }, + Value { + value_data: Some(ValueData::StringValue(String::from("value2"))), + }, + ], + }, + Row { + values: vec![ + Value { value_data: None }, + Value { + value_data: Some(ValueData::StringValue(String::from("value3"))), + }, + ], + }, + ] + ); + + let invalid_request_with_wrong_type = InsertRequest { + table_name: String::from("test_table"), + row_count: 3, + region_number: 1, + columns: vec![Column { + column_name: String::from("col1"), + datatype: ColumnDataType::Int32.into(), + semantic_type: SemanticType::Field.into(), + null_mask: bitvec![u8, Lsb0; 1, 0, 1].into_vec(), + values: Some(Values { + i8_values: vec![42], + ..Default::default() + }), + }], + }; + assert!(request_column_to_row(invalid_request_with_wrong_type).is_err()); + + let invalid_request_with_wrong_row_count = InsertRequest { + table_name: String::from("test_table"), + row_count: 3, + region_number: 1, + columns: vec![Column { + column_name: String::from("col1"), + datatype: ColumnDataType::Int32.into(), + semantic_type: SemanticType::Field.into(), + null_mask: bitvec![u8, Lsb0; 0, 0, 1].into_vec(), + values: Some(Values { + i32_values: vec![42], + ..Default::default() + }), + }], + }; + assert!(request_column_to_row(invalid_request_with_wrong_row_count).is_err()); + + let invalid_request_with_wrong_row_count = InsertRequest { + table_name: String::from("test_table"), + row_count: 3, + region_number: 1, + columns: vec![Column { + column_name: String::from("col1"), + datatype: ColumnDataType::Int32.into(), + semantic_type: SemanticType::Field.into(), + null_mask: vec![], + values: Some(Values { + i32_values: vec![42], + ..Default::default() + }), + }], + }; + assert!(request_column_to_row(invalid_request_with_wrong_row_count).is_err()); + } +} diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 3db236e0e8..2ee768af5b 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -25,31 +25,24 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; -use api::v1::alter_expr::Kind; -use api::v1::ddl_request::Expr as DdlExpr; -use api::v1::greptime_request::Request; use api::v1::meta::Role; -use api::v1::{ - AddColumns, AlterExpr, Column, DdlRequest, InsertRequest, InsertRequests, RowInsertRequests, -}; +use api::v1::{InsertRequests, RowInsertRequests}; use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use catalog::remote::CachedMetaKvBackend; use catalog::CatalogManagerRef; use client::client_manager::DatanodeClients; use common_base::Plugins; -use common_catalog::consts::default_engine; use common_error::ext::BoxedError; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use common_meta::heartbeat::handler::HandlerGroupExecutor; use common_meta::key::TableMetadataManager; use common_query::Output; -use common_telemetry::logging::{debug, info}; +use common_telemetry::logging::info; use common_telemetry::{error, timer}; use datanode::instance::sql::table_idents_to_full_name; use datanode::instance::InstanceRef as DnInstanceRef; -use datatypes::schema::Schema; use distributed::DistInstance; use meta_client::client::{MetaClient, MetaClientBuilder}; use partition::manager::PartitionRuleManager; @@ -78,21 +71,19 @@ use sql::parser::ParserContext; use sql::statements::copy::CopyTable; use sql::statements::statement::Statement; use sqlparser::ast::ObjectName; -use table::engine::TableReference; use crate::catalog::FrontendCatalogManager; use crate::error::{ - self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, - InvalidInsertRequestSnafu, MissingMetasrvOptsSnafu, ParseSqlSnafu, PermissionSnafu, - PlanStatementSnafu, Result, SqlExecInterceptedSnafu, + self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, MissingMetasrvOptsSnafu, + ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu, }; use crate::expr_factory::CreateExprFactory; use crate::frontend::FrontendOptions; use crate::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler; use crate::heartbeat::HeartbeatTask; +use crate::inserter::Inserter; use crate::instance::standalone::StandaloneGrpcQueryHandler; use crate::metrics; -use crate::row_inserter::RowInserter; use crate::script::ScriptExecutor; use crate::server::{start_server, ServerHandlers, Services}; use crate::statement::StatementExecutor; @@ -130,7 +121,6 @@ pub struct Instance { plugins: Arc, servers: Arc, heartbeat_task: Option, - row_inserter: Arc, } impl Instance { @@ -209,12 +199,6 @@ impl Instance { let create_expr_factory = CreateExprFactory; - let row_inserter = Arc::new(RowInserter::new( - catalog_manager.clone(), - create_expr_factory, - dist_instance.clone(), - )); - Ok(Instance { catalog_manager, script_executor, @@ -225,7 +209,6 @@ impl Instance { plugins: plugins.clone(), servers: Arc::new(HashMap::new()), heartbeat_task, - row_inserter, }) } @@ -281,12 +264,6 @@ impl Instance { let create_expr_factory = CreateExprFactory; let grpc_query_handler = StandaloneGrpcQueryHandler::arc(dn_instance.clone()); - let row_inserter = Arc::new(RowInserter::new( - catalog_manager.clone(), - create_expr_factory, - grpc_query_handler.clone(), - )); - Ok(Instance { catalog_manager: catalog_manager.clone(), script_executor, @@ -297,7 +274,6 @@ impl Instance { plugins: Default::default(), servers: Arc::new(HashMap::new()), heartbeat_task: None, - row_inserter, }) } @@ -318,7 +294,12 @@ impl Instance { requests: RowInsertRequests, ctx: QueryContextRef, ) -> Result { - self.row_inserter.handle_inserts(requests, ctx).await + let inserter = Inserter::new( + &self.catalog_manager, + &self.create_expr_factory, + &self.grpc_query_handler, + ); + inserter.handle_row_inserts(requests, ctx).await } /// Handle batch inserts @@ -327,130 +308,12 @@ impl Instance { requests: InsertRequests, ctx: QueryContextRef, ) -> Result { - for req in requests.inserts.iter() { - self.create_or_alter_table_on_demand(ctx.clone(), req) - .await?; - } - - let query = Request::Inserts(requests); - GrpcQueryHandler::do_query(&*self.grpc_query_handler, query, ctx).await - } - - // check if table already exist: - // - if table does not exist, create table by inferred CreateExpr - // - if table exist, check if schema matches. If any new column found, alter table by inferred `AlterExpr` - async fn create_or_alter_table_on_demand( - &self, - ctx: QueryContextRef, - request: &InsertRequest, - ) -> Result<()> { - let catalog_name = &ctx.current_catalog().to_owned(); - let schema_name = &ctx.current_schema().to_owned(); - let table_name = &request.table_name; - let columns = &request.columns; - - let table = self - .catalog_manager - .table(catalog_name, schema_name, table_name) - .await - .context(error::CatalogSnafu)?; - match table { - None => { - info!( - "Table {}.{}.{} does not exist, try create table", - catalog_name, schema_name, table_name, - ); - let _ = self - .create_table_by_columns(ctx, table_name, columns, default_engine()) - .await?; - info!( - "Successfully created table on insertion: {}.{}.{}", - catalog_name, schema_name, table_name - ); - } - Some(table) => { - let schema = table.schema(); - - validate_insert_request(schema.as_ref(), request)?; - - if let Some(add_columns) = common_grpc_expr::find_new_columns(&schema, columns) - .context(error::FindNewColumnsOnInsertionSnafu)? - { - info!( - "Find new columns {:?} on insertion, try to alter table: {}.{}.{}", - add_columns, catalog_name, schema_name, table_name - ); - let _ = self - .add_new_columns_to_table(ctx, table_name, add_columns) - .await?; - info!( - "Successfully altered table on insertion: {}.{}.{}", - catalog_name, schema_name, table_name - ); - } - } - }; - Ok(()) - } - - /// Infer create table expr from inserting data - async fn create_table_by_columns( - &self, - ctx: QueryContextRef, - table_name: &str, - columns: &[Column], - engine: &str, - ) -> Result { - let catalog_name = ctx.current_catalog(); - let schema_name = ctx.current_schema(); - - // Create table automatically, build schema from data. - let table_name = TableReference::full(catalog_name, schema_name, table_name); - let create_expr = - self.create_expr_factory - .create_table_expr_by_columns(&table_name, columns, engine)?; - - info!( - "Try to create table: {} automatically with request: {:?}", - table_name, create_expr, + let inserter = Inserter::new( + &self.catalog_manager, + &self.create_expr_factory, + &self.grpc_query_handler, ); - - self.grpc_query_handler - .do_query( - Request::Ddl(DdlRequest { - expr: Some(DdlExpr::CreateTable(create_expr)), - }), - ctx, - ) - .await - } - - async fn add_new_columns_to_table( - &self, - ctx: QueryContextRef, - table_name: &str, - add_columns: AddColumns, - ) -> Result { - debug!( - "Adding new columns: {:?} to table: {}", - add_columns, table_name - ); - let expr = AlterExpr { - catalog_name: ctx.current_catalog().to_owned(), - schema_name: ctx.current_schema().to_owned(), - table_name: table_name.to_string(), - kind: Some(Kind::AddColumns(add_columns)), - ..Default::default() - }; - - self.grpc_query_handler - .do_query( - Request::Ddl(DdlRequest { - expr: Some(DdlExpr::Alter(expr)), - }), - ctx, - ) - .await + inserter.handle_column_inserts(requests, ctx).await } pub fn set_plugins(&mut self, map: Arc) { @@ -737,36 +600,10 @@ fn validate_param(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> .context(SqlExecInterceptedSnafu) } -fn validate_insert_request(schema: &Schema, request: &InsertRequest) -> Result<()> { - for column_schema in schema.column_schemas() { - if column_schema.is_nullable() || column_schema.default_constraint().is_some() { - continue; - } - let not_null = request - .columns - .iter() - .find(|x| x.column_name == column_schema.name) - .map(|column| column.null_mask.is_empty() || column.null_mask.iter().all(|x| *x == 0)); - ensure!( - not_null == Some(true), - InvalidInsertRequestSnafu { - reason: format!( - "Expecting insert data to be presented on a not null or no default value column '{}'.", - &column_schema.name - ) - } - ); - } - Ok(()) -} - #[cfg(test)] mod tests { use std::collections::HashMap; - use api::v1::column::Values; - use datatypes::prelude::{ConcreteDataType, Value}; - use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema}; use query::query_engine::options::QueryOptions; use session::context::QueryContext; use sql::dialect::GreptimeDbDialect; @@ -774,71 +611,6 @@ mod tests { use super::*; - #[test] - fn test_validate_insert_request() { - let schema = Schema::new(vec![ - ColumnSchema::new("a", ConcreteDataType::int32_datatype(), true) - .with_default_constraint(None) - .unwrap(), - ColumnSchema::new("b", ConcreteDataType::int32_datatype(), true) - .with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::Int32(100)))) - .unwrap(), - ]); - let request = InsertRequest { - columns: vec![Column { - column_name: "c".to_string(), - values: Some(Values { - i32_values: vec![1], - ..Default::default() - }), - null_mask: vec![0], - ..Default::default() - }], - ..Default::default() - }; - // If nullable is true, it doesn't matter whether the insert request has the column. - validate_insert_request(&schema, &request).unwrap(); - - let schema = Schema::new(vec![ - ColumnSchema::new("a", ConcreteDataType::int32_datatype(), false) - .with_default_constraint(None) - .unwrap(), - ColumnSchema::new("b", ConcreteDataType::int32_datatype(), false) - .with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::Int32(-100)))) - .unwrap(), - ]); - let request = InsertRequest { - columns: vec![Column { - column_name: "a".to_string(), - values: Some(Values { - i32_values: vec![1], - ..Default::default() - }), - null_mask: vec![0], - ..Default::default() - }], - ..Default::default() - }; - // If nullable is false, but the column is defined with default value, - // it also doesn't matter whether the insert request has the column. - validate_insert_request(&schema, &request).unwrap(); - - let request = InsertRequest { - columns: vec![Column { - column_name: "b".to_string(), - values: Some(Values { - i32_values: vec![1], - ..Default::default() - }), - null_mask: vec![0], - ..Default::default() - }], - ..Default::default() - }; - // Neither of the above cases. - assert!(validate_insert_request(&schema, &request).is_err()); - } - #[test] fn test_exec_validation() { let query_ctx = QueryContext::arc(); diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index ab76e8d3a1..261b049f20 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -20,9 +20,9 @@ pub mod error; pub mod expr_factory; pub mod frontend; pub mod heartbeat; +pub(crate) mod inserter; pub mod instance; pub(crate) mod metrics; -mod row_inserter; mod script; mod server; pub mod service_config; diff --git a/src/frontend/src/row_inserter.rs b/src/frontend/src/row_inserter.rs deleted file mode 100644 index 803ca52dc5..0000000000 --- a/src/frontend/src/row_inserter.rs +++ /dev/null @@ -1,170 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use api::v1::alter_expr::Kind; -use api::v1::ddl_request::Expr; -use api::v1::greptime_request::Request; -use api::v1::{AlterExpr, ColumnSchema, DdlRequest, Row, RowInsertRequest, RowInsertRequests}; -use catalog::CatalogManagerRef; -use common_catalog::consts::default_engine; -use common_grpc_expr::util::{extract_new_columns, ColumnExpr}; -use common_query::Output; -use common_telemetry::info; -use servers::query_handler::grpc::GrpcQueryHandlerRef; -use session::context::QueryContextRef; -use snafu::{ensure, OptionExt, ResultExt}; -use table::engine::TableReference; -use table::TableRef; - -use crate::error::{CatalogSnafu, EmptyDataSnafu, Error, FindNewColumnsOnInsertionSnafu, Result}; -use crate::expr_factory::CreateExprFactory; - -pub struct RowInserter { - catalog_manager: CatalogManagerRef, - create_expr_factory: CreateExprFactory, - grpc_query_handler: GrpcQueryHandlerRef, -} - -impl RowInserter { - pub fn new( - catalog_manager: CatalogManagerRef, - create_expr_factory: CreateExprFactory, - grpc_query_handler: GrpcQueryHandlerRef, - ) -> Self { - Self { - catalog_manager, - create_expr_factory, - grpc_query_handler, - } - } - - pub async fn handle_inserts( - &self, - requests: RowInsertRequests, - ctx: QueryContextRef, - ) -> Result { - self.create_or_alter_tables_on_demand(&requests, ctx.clone()) - .await?; - let query = Request::RowInserts(requests); - self.grpc_query_handler.do_query(query, ctx).await - } - - // check if tables already exist: - // - if table does not exist, create table by inferred CreateExpr - // - if table exist, check if schema matches. If any new column found, alter table by inferred `AlterExpr` - async fn create_or_alter_tables_on_demand( - &self, - requests: &RowInsertRequests, - ctx: QueryContextRef, - ) -> Result<()> { - let catalog_name = ctx.current_catalog(); - let schema_name = ctx.current_schema(); - - // TODO(jeremy): create and alter in batch? - for req in &requests.inserts { - let table_name = &req.table_name; - let table = self - .catalog_manager - .table(catalog_name, schema_name, table_name) - .await - .context(CatalogSnafu)?; - match table { - Some(table) => { - self.alter_table_on_demand(catalog_name, schema_name, table, req, ctx.clone()) - .await? - } - None => { - let table_name = TableReference::full(catalog_name, schema_name, table_name); - self.create_table(&table_name, req, ctx.clone()).await? - } - } - } - - Ok(()) - } - - async fn create_table( - &self, - table_name: &TableReference<'_>, - req: &RowInsertRequest, - ctx: QueryContextRef, - ) -> Result<()> { - let (column_schemas, _) = extract_schema_and_rows(req)?; - let create_table_expr = self - .create_expr_factory - .create_table_expr_by_column_schemas(table_name, column_schemas, default_engine())?; - - let req = Request::Ddl(DdlRequest { - expr: Some(Expr::CreateTable(create_table_expr)), - }); - self.grpc_query_handler.do_query(req, ctx).await?; - - Ok(()) - } - - async fn alter_table_on_demand( - &self, - catalog_name: &str, - schema_name: &str, - table: TableRef, - req: &RowInsertRequest, - ctx: QueryContextRef, - ) -> Result<()> { - let (column_schemas, _) = extract_schema_and_rows(req)?; - let column_exprs = ColumnExpr::from_column_schemas(column_schemas); - let add_columns = extract_new_columns(&table.schema(), column_exprs) - .context(FindNewColumnsOnInsertionSnafu)?; - let Some(add_columns) = add_columns else { - return Ok(()); - }; - let table_name = table.table_info().name.clone(); - - info!( - "Adding new columns: {:?} to table: {}.{}.{}", - add_columns, catalog_name, schema_name, table_name - ); - - let alter_table_expr = AlterExpr { - catalog_name: catalog_name.to_string(), - schema_name: schema_name.to_string(), - table_name, - kind: Some(Kind::AddColumns(add_columns)), - ..Default::default() - }; - - let req = Request::Ddl(DdlRequest { - expr: Some(Expr::Alter(alter_table_expr)), - }); - self.grpc_query_handler.do_query(req, ctx).await?; - - Ok(()) - } -} - -fn extract_schema_and_rows(req: &RowInsertRequest) -> Result<(&[ColumnSchema], &[Row])> { - let rows = req.rows.as_ref().with_context(|| EmptyDataSnafu { - msg: format!("insert to table: {:?}", &req.table_name), - })?; - let schema = &rows.schema; - let rows = &rows.rows; - - ensure!( - !rows.is_empty(), - EmptyDataSnafu { - msg: format!("insert to table: {:?}", &req.table_name), - } - ); - - Ok((schema, rows)) -}