diff --git a/src/common/recordbatch/src/error.rs b/src/common/recordbatch/src/error.rs index 26c3fbca0b..9f71da6fdd 100644 --- a/src/common/recordbatch/src/error.rs +++ b/src/common/recordbatch/src/error.rs @@ -17,6 +17,7 @@ use std::any::Any; use common_error::ext::BoxedError; use common_error::prelude::*; +use datatypes::prelude::ConcreteDataType; pub type Result = std::result::Result; @@ -70,6 +71,26 @@ pub enum Error { source: datafusion_common::DataFusionError, backtrace: Backtrace, }, + + #[snafu(display("Column {} not exists in table {}", column_name, table_name))] + ColumnNotExists { + column_name: String, + table_name: String, + backtrace: Backtrace, + }, + + #[snafu(display( + "Failed to cast vector of type '{:?}' to type '{:?}', source: {}", + from_type, + to_type, + source + ))] + CastVector { + from_type: ConcreteDataType, + to_type: ConcreteDataType, + #[snafu(backtrace)] + source: datatypes::error::Error, + }, } impl ErrorExt for Error { @@ -81,11 +102,14 @@ impl ErrorExt for Error { | Error::CreateRecordBatches { .. } | Error::PollStream { .. } | Error::Format { .. } - | Error::InitRecordbatchStream { .. } => StatusCode::Internal, + | Error::InitRecordbatchStream { .. } + | Error::ColumnNotExists { .. } => StatusCode::Internal, Error::External { source } => source.status_code(), - Error::SchemaConversion { source, .. } => source.status_code(), + Error::SchemaConversion { source, .. } | Error::CastVector { source, .. } => { + source.status_code() + } } } diff --git a/src/common/recordbatch/src/recordbatch.rs b/src/common/recordbatch/src/recordbatch.rs index a795626407..721accaf10 100644 --- a/src/common/recordbatch/src/recordbatch.rs +++ b/src/common/recordbatch/src/recordbatch.rs @@ -12,14 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; + use datatypes::schema::SchemaRef; use datatypes::value::Value; use datatypes::vectors::{Helper, VectorRef}; use serde::ser::{Error, SerializeStruct}; use serde::{Serialize, Serializer}; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; -use crate::error::{self, Result}; +use crate::error::{self, CastVectorSnafu, ColumnNotExistsSnafu, Result}; use crate::DfRecordBatch; /// A two-dimensional batch of column-oriented data with a defined schema. @@ -108,6 +110,41 @@ impl RecordBatch { pub fn rows(&self) -> RecordBatchRowIterator<'_> { RecordBatchRowIterator::new(self) } + + pub fn column_vectors( + &self, + table_name: &str, + table_schema: SchemaRef, + ) -> Result> { + let mut vectors = HashMap::with_capacity(self.num_columns()); + + // column schemas in recordbatch must match its vectors, otherwise it's corrupted + for (vector_schema, vector) in self.schema.column_schemas().iter().zip(self.columns.iter()) + { + let column_name = &vector_schema.name; + let column_schema = + table_schema + .column_schema_by_name(column_name) + .context(ColumnNotExistsSnafu { + table_name, + column_name, + })?; + let vector = if vector_schema.data_type != column_schema.data_type { + vector + .cast(&column_schema.data_type) + .with_context(|_| CastVectorSnafu { + from_type: vector.data_type(), + to_type: column_schema.data_type.clone(), + })? + } else { + vector.clone() + }; + + vectors.insert(column_name.clone(), vector); + } + + Ok(vectors) + } } impl Serialize for RecordBatch { diff --git a/src/datanode/src/instance/sql.rs b/src/datanode/src/instance/sql.rs index e757a3ff42..615ec444a5 100644 --- a/src/datanode/src/instance/sql.rs +++ b/src/datanode/src/instance/sql.rs @@ -53,10 +53,6 @@ impl Instance { .await?; self.sql_handler.insert(request).await } - QueryStatement::Sql(Statement::Delete(delete)) => { - let request = SqlRequest::Delete(*delete); - self.sql_handler.execute(request, query_ctx).await - } QueryStatement::Sql(Statement::CreateDatabase(create_database)) => { let request = CreateDatabaseRequest { db_name: create_database.name.to_string(), @@ -185,6 +181,7 @@ impl Instance { | QueryStatement::Sql(Statement::Explain(_)) | QueryStatement::Sql(Statement::Use(_)) | QueryStatement::Sql(Statement::Tql(_)) + | QueryStatement::Sql(Statement::Delete(_)) | QueryStatement::Promql(_) => unreachable!(), } } diff --git a/src/datanode/src/sql.rs b/src/datanode/src/sql.rs index a1c4cea4c5..76a7694fc7 100644 --- a/src/datanode/src/sql.rs +++ b/src/datanode/src/sql.rs @@ -20,7 +20,6 @@ use common_telemetry::error; use query::sql::{describe_table, show_databases, show_tables}; use session::context::QueryContextRef; use snafu::{OptionExt, ResultExt}; -use sql::statements::delete::Delete; use sql::statements::describe::DescribeTable; use sql::statements::show::{ShowDatabases, ShowTables}; use table::engine::{EngineContext, TableEngineProcedureRef, TableEngineRef, TableReference}; @@ -36,7 +35,6 @@ mod alter; mod copy_table_from; mod copy_table_to; mod create; -mod delete; mod drop_table; mod flush_table; pub(crate) mod insert; @@ -51,7 +49,6 @@ pub enum SqlRequest { ShowDatabases(ShowDatabases), ShowTables(ShowTables), DescribeTable(DescribeTable), - Delete(Delete), CopyTable(CopyTableRequest), } @@ -89,7 +86,6 @@ impl SqlHandler { SqlRequest::CreateDatabase(req) => self.create_database(req, query_ctx.clone()).await, SqlRequest::Alter(req) => self.alter(req).await, SqlRequest::DropTable(req) => self.drop_table(req).await, - SqlRequest::Delete(req) => self.delete(query_ctx.clone(), req).await, SqlRequest::CopyTable(req) => match req.direction { CopyDirection::Export => self.copy_table_to(req).await, CopyDirection::Import => self.copy_table_from(req).await, diff --git a/src/datanode/src/sql/delete.rs b/src/datanode/src/sql/delete.rs deleted file mode 100644 index 4e3c0015c0..0000000000 --- a/src/datanode/src/sql/delete.rs +++ /dev/null @@ -1,142 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; -use std::sync::Arc; - -use common_query::Output; -use datatypes::data_type::DataType; -use datatypes::prelude::VectorRef; -use datatypes::vectors::StringVector; -use session::context::QueryContextRef; -use snafu::{OptionExt, ResultExt}; -use sql::ast::{BinaryOperator, Expr, Value}; -use sql::statements::delete::Delete; -use sql::statements::sql_value_to_value; -use table::engine::TableReference; -use table::requests::DeleteRequest; -use table::TableRef; - -use crate::error::{ColumnNotFoundSnafu, DeleteSnafu, InvalidSqlSnafu, NotSupportSqlSnafu, Result}; -use crate::instance::sql::table_idents_to_full_name; -use crate::sql::SqlHandler; - -impl SqlHandler { - pub(crate) async fn delete(&self, query_ctx: QueryContextRef, stmt: Delete) -> Result { - let (catalog_name, schema_name, table_name) = - table_idents_to_full_name(stmt.table_name(), query_ctx)?; - let table_ref = TableReference { - catalog: &catalog_name.to_string(), - schema: &schema_name.to_string(), - table: &table_name.to_string(), - }; - - let table = self.get_table(&table_ref)?; - - let req = DeleteRequest { - key_column_values: parse_selection(stmt.selection(), &table)?, - }; - - let affected_rows = table.delete(req).await.with_context(|_| DeleteSnafu { - table_name: table_ref.to_string(), - })?; - - Ok(Output::AffectedRows(affected_rows)) - } -} - -/// parse selection, currently supported format is `tagkey1 = 'tagvalue1' and 'ts' = 'value'`. -/// (only uses =, and in the where clause and provides all columns needed by the key.) -fn parse_selection( - selection: &Option, - table: &TableRef, -) -> Result> { - let mut key_column_values = HashMap::new(); - if let Some(expr) = selection { - parse_expr(expr, &mut key_column_values, table)?; - } - Ok(key_column_values) -} - -fn parse_expr( - expr: &Expr, - key_column_values: &mut HashMap, - table: &TableRef, -) -> Result<()> { - // match BinaryOp - if let Expr::BinaryOp { left, op, right } = expr { - match (&**left, op, &**right) { - // match And operator - (Expr::BinaryOp { .. }, BinaryOperator::And, Expr::BinaryOp { .. }) => { - parse_expr(left, key_column_values, table)?; - parse_expr(right, key_column_values, table)?; - return Ok(()); - } - // match Eq operator - (Expr::Identifier(column_name), BinaryOperator::Eq, Expr::Value(value)) => { - key_column_values.insert( - column_name.to_string(), - value_to_vector(&column_name.to_string(), value, table)?, - ); - return Ok(()); - } - (Expr::Identifier(column_name), BinaryOperator::Eq, Expr::Identifier(value)) => { - key_column_values.insert( - column_name.to_string(), - Arc::new(StringVector::from(vec![value.to_string()])), - ); - return Ok(()); - } - _ => {} - } - } - NotSupportSqlSnafu { - msg: format!( - "Not support sql expr:{expr},correct format is tagkey1 = tagvalue1 and ts = value" - ), - } - .fail() -} - -/// parse value to vector -fn value_to_vector(column_name: &String, sql_value: &Value, table: &TableRef) -> Result { - let schema = table.schema(); - let column_schema = - schema - .column_schema_by_name(column_name) - .with_context(|| ColumnNotFoundSnafu { - table_name: table.table_info().name.clone(), - column_name: column_name.to_string(), - })?; - let data_type = &column_schema.data_type; - let value = sql_value_to_value(column_name, data_type, sql_value); - match value { - Ok(value) => { - let mut vec = data_type.create_mutable_vector(1); - if vec.try_push_value_ref(value.as_value_ref()).is_err() { - return InvalidSqlSnafu { - msg: format!( - "invalid sql, column name is {column_name}, value is {sql_value}", - ), - } - .fail(); - } - Ok(vec.to_vector()) - } - _ => InvalidSqlSnafu { - msg: format!("invalid sql, column name is {column_name}, value is {sql_value}",), - } - .fail(), - } -} diff --git a/src/datanode/src/tests/instance_test.rs b/src/datanode/src/tests/instance_test.rs index d0627c1646..7474f65285 100644 --- a/src/datanode/src/tests/instance_test.rs +++ b/src/datanode/src/tests/instance_test.rs @@ -751,7 +751,7 @@ async fn test_delete() { let output = execute_sql( &instance, - "delete from test_table where host = host1 and ts = 1655276557000 ", + "delete from test_table where host = 'host1' and ts = 1655276557000 ", ) .await; assert!(matches!(output, Output::AffectedRows(1))); @@ -981,7 +981,9 @@ async fn try_execute_sql_in_db( let stmt = QueryLanguageParser::parse_sql(sql).unwrap(); match stmt { - QueryStatement::Sql(Statement::Query(_)) => plan_exec(instance, stmt, query_ctx).await, + QueryStatement::Sql(Statement::Query(_)) | QueryStatement::Sql(Statement::Delete(_)) => { + plan_exec(instance, stmt, query_ctx).await + } QueryStatement::Sql(Statement::Insert(ref insert)) if insert.is_insert_select() => { plan_exec(instance, stmt, query_ctx).await } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index f07396a5d9..bc2cdd689e 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -468,7 +468,9 @@ impl Instance { check_permission(self.plugins.clone(), &stmt, &query_ctx)?; match stmt { - Statement::Query(_) | Statement::Explain(_) => self.plan_exec(stmt, query_ctx).await, + Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => { + self.plan_exec(stmt, query_ctx).await + } // For performance consideration, only "insert with select" is executed by query engine. // Plain insert ("insert with values") is still executed directly in statement. @@ -483,7 +485,6 @@ impl Instance { | Statement::ShowTables(_) | Statement::DescribeTable(_) | Statement::Insert(_) - | Statement::Delete(_) | Statement::Alter(_) | Statement::DropTable(_) | Statement::Copy(_) => self @@ -657,8 +658,8 @@ pub fn check_permission( } match stmt { - // query,explain and tql will be checked in QueryEngineState - Statement::Query(_) | Statement::Explain(_) | Statement::Tql(_) => {} + // These are executed by query engine, and will be checked there. + Statement::Query(_) | Statement::Explain(_) | Statement::Tql(_) | Statement::Delete(_) => {} // database ops won't be checked Statement::CreateDatabase(_) | Statement::ShowDatabases(_) | Statement::Use(_) => {} // show create table and alter are not supported yet @@ -683,9 +684,6 @@ pub fn check_permission( Statement::DescribeTable(stmt) => { validate_param(stmt.name(), query_ctx)?; } - Statement::Delete(delete) => { - validate_param(delete.table_name(), query_ctx)?; - } Statement::Copy(stmd) => match stmd { CopyTable::To(copy_table_to) => validate_param(©_table_to.table_name, query_ctx)?, CopyTable::From(copy_table_from) => { diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index 498a104959..31597fa488 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -18,6 +18,7 @@ mod catalog_adapter; mod error; mod planner; +use std::collections::HashMap; use std::sync::Arc; use async_trait::async_trait; @@ -35,18 +36,20 @@ use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::ExecutionPlan; use datafusion_common::ResolvedTableReference; use datafusion_expr::{DmlStatement, LogicalPlan as DfLogicalPlan, WriteOp}; +use datatypes::prelude::VectorRef; use datatypes::schema::Schema; use futures_util::StreamExt; use session::context::QueryContextRef; -use snafu::{OptionExt, ResultExt}; -use table::requests::InsertRequest; +use snafu::{ensure, OptionExt, ResultExt}; +use table::requests::{DeleteRequest, InsertRequest}; use table::TableRef; pub use crate::datafusion::catalog_adapter::DfCatalogListAdapter; pub use crate::datafusion::planner::DfContextProviderAdapter; use crate::error::{ CatalogNotFoundSnafu, CatalogSnafu, CreateRecordBatchSnafu, DataFusionSnafu, - QueryExecutionSnafu, Result, SchemaNotFoundSnafu, TableNotFoundSnafu, UnsupportedExprSnafu, + MissingTimestampColumnSnafu, QueryExecutionSnafu, Result, SchemaNotFoundSnafu, + TableNotFoundSnafu, UnsupportedExprSnafu, }; use crate::executor::QueryExecutor; use crate::logical_optimizer::LogicalOptimizer; @@ -76,11 +79,18 @@ impl DatafusionQueryEngine { Ok(Output::Stream(self.execute_stream(&ctx, &physical_plan)?)) } - async fn exec_insert_plan( + async fn exec_dml_statement( &self, - dml: &DmlStatement, + dml: DmlStatement, query_ctx: QueryContextRef, ) -> Result { + ensure!( + matches!(dml.op, WriteOp::Insert | WriteOp::Delete), + UnsupportedExprSnafu { + name: format!("DML op {}", dml.op), + } + ); + let default_catalog = query_ctx.current_catalog(); let default_schema = query_ctx.current_schema(); let table_name = dml @@ -101,20 +111,75 @@ impl DatafusionQueryEngine { let mut affected_rows = 0; while let Some(batch) = stream.next().await { let batch = batch.context(CreateRecordBatchSnafu)?; - let request = InsertRequest::try_from_recordbatch(&table_name, table.schema(), batch) + let column_vectors = batch + .column_vectors(&table_name.to_string(), table.schema()) .map_err(BoxedError::new) .context(QueryExecutionSnafu)?; - let rows = table - .insert(request) - .await - .map_err(BoxedError::new) - .context(QueryExecutionSnafu)?; + let rows = match dml.op { + WriteOp::Insert => Self::insert(&table_name, &table, column_vectors).await?, + WriteOp::Delete => Self::delete(&table_name, &table, column_vectors).await?, + _ => unreachable!("guarded by the 'ensure!' at the beginning"), + }; affected_rows += rows; } Ok(Output::AffectedRows(affected_rows)) } + async fn delete<'a>( + table_name: &ResolvedTableReference<'a>, + table: &TableRef, + column_vectors: HashMap, + ) -> Result { + let table_schema = table.schema(); + let ts_column = table_schema + .timestamp_column() + .map(|x| &x.name) + .with_context(|| MissingTimestampColumnSnafu { + table_name: table_name.to_string(), + })?; + + let table_info = table.table_info(); + let rowkey_columns = table_info + .meta + .row_key_column_names() + .collect::>(); + let column_vectors = column_vectors + .into_iter() + .filter(|x| &x.0 == ts_column || rowkey_columns.contains(&&x.0)) + .collect::>(); + + let request = DeleteRequest { + key_column_values: column_vectors, + }; + + table + .delete(request) + .await + .map_err(BoxedError::new) + .context(QueryExecutionSnafu) + } + + async fn insert<'a>( + table_name: &ResolvedTableReference<'a>, + table: &TableRef, + column_vectors: HashMap, + ) -> Result { + let request = InsertRequest { + catalog_name: table_name.catalog.to_string(), + schema_name: table_name.schema.to_string(), + table_name: table_name.table.to_string(), + columns_values: column_vectors, + region_number: 0, + }; + + table + .insert(request) + .await + .map_err(BoxedError::new) + .context(QueryExecutionSnafu) + } + async fn find_table(&self, table_name: &ResolvedTableReference<'_>) -> Result { let catalog_name = table_name.catalog.as_ref(); let schema_name = table_name.schema.as_ref(); @@ -163,13 +228,9 @@ impl QueryEngine for DatafusionQueryEngine { async fn execute(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result { match plan { - LogicalPlan::DfPlan(DfLogicalPlan::Dml(dml)) => match dml.op { - WriteOp::Insert => self.exec_insert_plan(&dml, query_ctx).await, - _ => UnsupportedExprSnafu { - name: format!("DML op {}", dml.op), - } - .fail(), - }, + LogicalPlan::DfPlan(DfLogicalPlan::Dml(dml)) => { + self.exec_dml_statement(dml, query_ctx).await + } _ => self.exec_query_plan(plan).await, } } diff --git a/src/query/src/error.rs b/src/query/src/error.rs index b373a5d34d..fde14f2a4c 100644 --- a/src/query/src/error.rs +++ b/src/query/src/error.rs @@ -110,6 +110,12 @@ pub enum Error { source: DataFusionError, backtrace: Backtrace, }, + + #[snafu(display("Timestamp column for table '{table_name}' is missing!"))] + MissingTimestampColumn { + table_name: String, + backtrace: Backtrace, + }, } impl ErrorExt for Error { @@ -131,7 +137,7 @@ impl ErrorExt for Error { } CreateRecordBatch { source } => source.status_code(), QueryExecution { source } | QueryPlan { source } => source.status_code(), - DataFusion { .. } => StatusCode::Internal, + DataFusion { .. } | MissingTimestampColumn { .. } => StatusCode::Internal, Sql { source } => source.status_code(), PlanSql { .. } => StatusCode::PlanQuery, } diff --git a/src/sql/src/parsers/delete_parser.rs b/src/sql/src/parsers/delete_parser.rs index f538262c2c..1d52f1839f 100644 --- a/src/sql/src/parsers/delete_parser.rs +++ b/src/sql/src/parsers/delete_parser.rs @@ -31,7 +31,7 @@ impl<'a> ParserContext<'a> { match spstatement { SpStatement::Delete { .. } => { - Ok(Statement::Delete(Box::new(Delete::try_from(spstatement)?))) + Ok(Statement::Delete(Box::new(Delete { inner: spstatement }))) } unexp => error::UnsupportedSnafu { sql: self.sql.to_string(), diff --git a/src/sql/src/statements/delete.rs b/src/sql/src/statements/delete.rs index bb9661f3a5..47ba87a011 100644 --- a/src/sql/src/statements/delete.rs +++ b/src/sql/src/statements/delete.rs @@ -12,58 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use sqlparser::ast::{Expr, ObjectName, Statement, TableFactor}; - -use crate::error::{Error, InvalidSqlSnafu, Result}; +use sqlparser::ast::Statement; #[derive(Debug, Clone, PartialEq, Eq)] pub struct Delete { - table_name: ObjectName, - selection: Option, -} - -impl Delete { - pub fn table_name(&self) -> &ObjectName { - &self.table_name - } - - pub fn selection(&self) -> &Option { - &self.selection - } -} - -impl TryFrom for Delete { - type Error = Error; - - fn try_from(stmt: Statement) -> Result { - match stmt { - Statement::Delete { - table_name, - using, - selection, - returning, - } => { - if using.is_some() || returning.is_some() { - return InvalidSqlSnafu { - msg: "delete sql isn't support using and returning.".to_string(), - } - .fail(); - } - match table_name { - TableFactor::Table { name, .. } => Ok(Delete { - table_name: name, - selection, - }), - _ => InvalidSqlSnafu { - msg: "can't find table name, tableFactor is not Table type".to_string(), - } - .fail(), - } - } - unexp => InvalidSqlSnafu { - msg: format!("Not expected to be {unexp}"), - } - .fail(), - } - } + pub inner: Statement, } diff --git a/src/sql/src/statements/statement.rs b/src/sql/src/statements/statement.rs index 9f09d9169e..684d11e1d2 100644 --- a/src/sql/src/statements/statement.rs +++ b/src/sql/src/statements/statement.rs @@ -80,6 +80,7 @@ impl TryFrom<&Statement> for DfStatement { Statement::Query(query) => SpStatement::Query(Box::new(query.inner.clone())), Statement::Explain(explain) => explain.inner.clone(), Statement::Insert(insert) => insert.inner.clone(), + Statement::Delete(delete) => delete.inner.clone(), _ => { return ConvertToDfStatementSnafu { statement: format!("{s:?}"), diff --git a/src/table/src/error.rs b/src/table/src/error.rs index 5561c5613a..0376db64cc 100644 --- a/src/table/src/error.rs +++ b/src/table/src/error.rs @@ -18,7 +18,6 @@ use common_error::prelude::*; use common_recordbatch::error::Error as RecordBatchError; use datafusion::error::DataFusionError; use datatypes::arrow::error::ArrowError; -use datatypes::prelude::ConcreteDataType; pub type Result = std::result::Result; @@ -115,19 +114,6 @@ pub enum Error { value: String, backtrace: Backtrace, }, - - #[snafu(display( - "Failed to cast vector of type '{:?}' to type '{:?}', source: {}", - from_type, - to_type, - source - ))] - CastVector { - from_type: ConcreteDataType, - to_type: ConcreteDataType, - #[snafu(backtrace)] - source: datatypes::error::Error, - }, } impl ErrorExt for Error { @@ -142,9 +128,7 @@ impl ErrorExt for Error { } Error::TablesRecordBatch { .. } => StatusCode::Unexpected, Error::ColumnExists { .. } => StatusCode::TableColumnExists, - Error::SchemaBuild { source, .. } | Error::CastVector { source, .. } => { - source.status_code() - } + Error::SchemaBuild { source, .. } => source.status_code(), Error::TableOperation { source } => source.status_code(), Error::ColumnNotExists { .. } => StatusCode::TableColumnNotFound, Error::RegionSchemaMismatch { .. } => StatusCode::StorageUnavailable, diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index 957a62eaf0..92a26dde2f 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -13,7 +13,6 @@ // limitations under the License. //! Table and TableEngine requests -mod insert; use std::collections::HashMap; use std::str::FromStr; @@ -22,7 +21,6 @@ use std::time::Duration; use common_base::readable_size::ReadableSize; use datatypes::prelude::VectorRef; use datatypes::schema::{ColumnSchema, RawSchema}; -pub use insert::InsertRequest; use serde::{Deserialize, Serialize}; use store_api::storage::RegionNumber; @@ -174,6 +172,15 @@ pub struct DropTableRequest { pub table_name: String, } +#[derive(Debug)] +pub struct InsertRequest { + pub catalog_name: String, + pub schema_name: String, + pub table_name: String, + pub columns_values: HashMap, + pub region_number: RegionNumber, +} + /// Delete (by primary key) request #[derive(Debug)] pub struct DeleteRequest { diff --git a/src/table/src/requests/insert.rs b/src/table/src/requests/insert.rs deleted file mode 100644 index ddf6f8c271..0000000000 --- a/src/table/src/requests/insert.rs +++ /dev/null @@ -1,79 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; - -use common_recordbatch::RecordBatch; -use datafusion_common::ResolvedTableReference; -use datatypes::prelude::VectorRef; -use datatypes::schema::SchemaRef; -use snafu::{OptionExt, ResultExt}; -use store_api::storage::RegionNumber; - -use crate::error::{CastVectorSnafu, ColumnNotExistsSnafu, Result}; - -#[derive(Debug)] -pub struct InsertRequest { - pub catalog_name: String, - pub schema_name: String, - pub table_name: String, - pub columns_values: HashMap, - pub region_number: RegionNumber, -} - -impl InsertRequest { - pub fn try_from_recordbatch( - table_name: &ResolvedTableReference, - table_schema: SchemaRef, - recordbatch: RecordBatch, - ) -> Result { - let mut columns_values = HashMap::with_capacity(recordbatch.num_columns()); - - // column schemas in recordbatch must match its vectors, otherwise it's corrupted - for (vector_schema, vector) in recordbatch - .schema - .column_schemas() - .iter() - .zip(recordbatch.columns().iter()) - { - let column_name = &vector_schema.name; - let column_schema = table_schema - .column_schema_by_name(column_name) - .with_context(|| ColumnNotExistsSnafu { - table_name: table_name.table.to_string(), - column_name, - })?; - let vector = if vector_schema.data_type != column_schema.data_type { - vector - .cast(&column_schema.data_type) - .with_context(|_| CastVectorSnafu { - from_type: vector.data_type(), - to_type: column_schema.data_type.clone(), - })? - } else { - vector.clone() - }; - - columns_values.insert(column_name.clone(), vector); - } - - Ok(InsertRequest { - catalog_name: table_name.catalog.to_string(), - schema_name: table_name.schema.to_string(), - table_name: table_name.table.to_string(), - columns_values, - region_number: 0, - }) - } -} diff --git a/tests/cases/standalone/delete/delete_invalid.result b/tests/cases/standalone/delete/delete_invalid.result index 2e26936898..64aa607702 100644 --- a/tests/cases/standalone/delete/delete_invalid.result +++ b/tests/cases/standalone/delete/delete_invalid.result @@ -8,19 +8,19 @@ Affected Rows: 3 delete from monitor where cpu = 66.6 and ts = 1655276557000; -Error: 1004(InvalidArguments), Missing column host in write batch +Affected Rows: 1 delete from monitor where host = 'host1' or ts = 1655276557000; -Error: 1004(InvalidArguments), Not support SQL, error: Not support sql expr:host = 'host1' OR ts = 1655276557000,correct format is tagkey1 = tagvalue1 and ts = value +Affected Rows: 0 delete from monitor where host = 'host1' or ts != 1655276557000; -Error: 1004(InvalidArguments), Not support SQL, error: Not support sql expr:host = 'host1' OR ts <> 1655276557000,correct format is tagkey1 = tagvalue1 and ts = value +Affected Rows: 2 delete from monitor where ts != 1655276557000; -Error: 1004(InvalidArguments), Not support SQL, error: Not support sql expr:ts <> 1655276557000,correct format is tagkey1 = tagvalue1 and ts = value +Affected Rows: 0 drop table monitor;