From 17daf4cdffda281f8c539f892cf1c5930ff04f41 Mon Sep 17 00:00:00 2001 From: LFC Date: Mon, 24 Apr 2023 12:07:50 +0800 Subject: [PATCH] feat: support "delete" in distributed mode (#1441) * feat: support "delete" in distributed mode * fix: resolve PR comments --- Cargo.lock | 2 +- src/api/Cargo.toml | 2 +- src/client/src/database.rs | 15 +- src/client/src/metrics.rs | 1 + src/common/grpc-expr/src/delete.rs | 113 +++++++ src/common/grpc-expr/src/error.rs | 11 +- src/common/grpc-expr/src/insert.rs | 2 +- src/common/grpc-expr/src/lib.rs | 1 + src/datanode/src/error.rs | 7 + src/datanode/src/instance/grpc.rs | 112 ++++++- src/frontend/src/error.rs | 10 + src/frontend/src/instance/distributed.rs | 49 ++- src/frontend/src/instance/distributed/grpc.rs | 1 + src/frontend/src/instance/grpc.rs | 301 +++++++++++++----- src/frontend/src/table.rs | 105 +++++- src/frontend/src/table/delete.rs | 110 +++++++ src/frontend/src/table/insert.rs | 81 ++--- src/frontend/src/table/scan.rs | 6 +- src/frontend/src/tests/instance_test.rs | 3 +- src/partition/src/columns.rs | 10 +- src/partition/src/error.rs | 4 + src/partition/src/manager.rs | 6 +- src/partition/src/partition.rs | 22 +- src/partition/src/range.rs | 4 +- src/partition/src/splitter.rs | 85 ++++- src/servers/tests/mod.rs | 2 +- src/table/src/error.rs | 11 +- .../standalone/common/delete/delete.result | 62 ++++ .../cases/standalone/common/delete/delete.sql | 26 ++ tests/cases/standalone/delete/delete.result | 35 -- tests/cases/standalone/delete/delete.sql | 11 - .../standalone/delete/delete_invalid.result | 28 -- .../standalone/delete/delete_invalid.sql | 13 - 33 files changed, 965 insertions(+), 286 deletions(-) create mode 100644 src/common/grpc-expr/src/delete.rs create mode 100644 src/frontend/src/table/delete.rs create mode 100644 tests/cases/standalone/common/delete/delete.result create mode 100644 tests/cases/standalone/common/delete/delete.sql delete mode 100644 tests/cases/standalone/delete/delete.result delete mode 100644 tests/cases/standalone/delete/delete.sql delete mode 100644 tests/cases/standalone/delete/delete_invalid.result delete mode 100644 tests/cases/standalone/delete/delete_invalid.sql diff --git a/Cargo.lock b/Cargo.lock index c79900ea15..e95d78654b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3821,7 +3821,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=0bebe5f69c91cdfbce85cb8f45f9fcd28185261c#0bebe5f69c91cdfbce85cb8f45f9fcd28185261c" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=57e6a032db70264ec394cbb8a2f327ad33705d76#57e6a032db70264ec394cbb8a2f327ad33705d76" dependencies = [ "prost", "tonic 0.9.1", diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index 5b6c5af48e..5e068c05fe 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -10,7 +10,7 @@ common-base = { path = "../common/base" } common-error = { path = "../common/error" } common-time = { path = "../common/time" } datatypes = { path = "../datatypes" } -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "0bebe5f69c91cdfbce85cb8f45f9fcd28185261c" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "57e6a032db70264ec394cbb8a2f327ad33705d76" } prost.workspace = true snafu = { version = "0.7", features = ["backtraces"] } tonic.workspace = true diff --git a/src/client/src/database.rs b/src/client/src/database.rs index 9241c8edaa..03c9839949 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -18,8 +18,8 @@ use api::v1::greptime_request::Request; use api::v1::query_request::Query; use api::v1::{ greptime_response, AffectedRows, AlterExpr, AuthHeader, CreateTableExpr, DdlRequest, - DropTableExpr, FlushTableExpr, GreptimeRequest, InsertRequest, PromRangeQuery, QueryRequest, - RequestHeader, + DeleteRequest, DropTableExpr, FlushTableExpr, GreptimeRequest, InsertRequest, PromRangeQuery, + QueryRequest, RequestHeader, }; use arrow_flight::{FlightData, Ticket}; use common_error::prelude::*; @@ -108,6 +108,15 @@ impl Database { pub async fn insert(&self, request: InsertRequest) -> Result { let _timer = timer!(metrics::METRIC_GRPC_INSERT); + self.handle(Request::Insert(request)).await + } + + pub async fn delete(&self, request: DeleteRequest) -> Result { + let _timer = timer!(metrics::METRIC_GRPC_DELETE); + self.handle(Request::Delete(request)).await + } + + async fn handle(&self, request: Request) -> Result { let mut client = self.client.make_database_client()?.inner; let request = GreptimeRequest { header: Some(RequestHeader { @@ -116,7 +125,7 @@ impl Database { authorization: self.ctx.auth_header.clone(), dbname: self.dbname.clone(), }), - request: Some(Request::Insert(request)), + request: Some(request), }; let response = client .handle(request) diff --git a/src/client/src/metrics.rs b/src/client/src/metrics.rs index 33e824995a..370b17eb58 100644 --- a/src/client/src/metrics.rs +++ b/src/client/src/metrics.rs @@ -16,6 +16,7 @@ pub const METRIC_GRPC_CREATE_TABLE: &str = "grpc.create_table"; pub const METRIC_GRPC_PROMQL_RANGE_QUERY: &str = "grpc.promql.range_query"; pub const METRIC_GRPC_INSERT: &str = "grpc.insert"; +pub const METRIC_GRPC_DELETE: &str = "grpc.delete"; pub const METRIC_GRPC_SQL: &str = "grpc.sql"; pub const METRIC_GRPC_LOGICAL_PLAN: &str = "grpc.logical_plan"; pub const METRIC_GRPC_ALTER: &str = "grpc.alter"; diff --git a/src/common/grpc-expr/src/delete.rs b/src/common/grpc-expr/src/delete.rs new file mode 100644 index 0000000000..3ebe16a7a0 --- /dev/null +++ b/src/common/grpc-expr/src/delete.rs @@ -0,0 +1,113 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use api::helper::ColumnDataTypeWrapper; +use api::v1::{Column, DeleteRequest as GrpcDeleteRequest}; +use datatypes::data_type::DataType; +use datatypes::prelude::ConcreteDataType; +use snafu::{ensure, ResultExt}; +use table::requests::DeleteRequest; + +use crate::error::{ColumnDataTypeSnafu, IllegalDeleteRequestSnafu, Result}; +use crate::insert::add_values_to_builder; + +pub fn to_table_delete_request(request: GrpcDeleteRequest) -> Result { + let row_count = request.row_count as usize; + + let mut key_column_values = HashMap::with_capacity(request.key_columns.len()); + for Column { + column_name, + values, + null_mask, + datatype, + .. + } in request.key_columns + { + let Some(values) = values else { continue }; + + let datatype: ConcreteDataType = ColumnDataTypeWrapper::try_new(datatype) + .context(ColumnDataTypeSnafu)? + .into(); + + let vector_builder = &mut datatype.create_mutable_vector(row_count); + + add_values_to_builder(vector_builder, values, row_count, null_mask)?; + + ensure!( + key_column_values + .insert(column_name.clone(), vector_builder.to_vector()) + .is_none(), + IllegalDeleteRequestSnafu { + reason: format!("Duplicated column '{column_name}' in delete request.") + } + ); + } + + Ok(DeleteRequest { key_column_values }) +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use api::v1::column::Values; + use api::v1::ColumnDataType; + use datatypes::prelude::{ScalarVector, VectorRef}; + use datatypes::vectors::{Int32Vector, StringVector}; + + use super::*; + + #[test] + fn test_to_table_delete_request() { + let grpc_request = GrpcDeleteRequest { + table_name: "foo".to_string(), + region_number: 0, + key_columns: vec![ + Column { + column_name: "id".to_string(), + values: Some(Values { + i32_values: vec![1, 2, 3], + ..Default::default() + }), + datatype: ColumnDataType::Int32 as i32, + ..Default::default() + }, + Column { + column_name: "name".to_string(), + values: Some(Values { + string_values: vec!["a".to_string(), "b".to_string(), "c".to_string()], + ..Default::default() + }), + datatype: ColumnDataType::String as i32, + ..Default::default() + }, + ], + row_count: 3, + }; + + let mut request = to_table_delete_request(grpc_request).unwrap(); + + assert_eq!( + Arc::new(Int32Vector::from_slice(vec![1, 2, 3])) as VectorRef, + request.key_column_values.remove("id").unwrap() + ); + assert_eq!( + Arc::new(StringVector::from_slice(&["a", "b", "c"])) as VectorRef, + request.key_column_values.remove("name").unwrap() + ); + assert!(request.key_column_values.is_empty()); + } +} diff --git a/src/common/grpc-expr/src/error.rs b/src/common/grpc-expr/src/error.rs index 07e2c42723..893fbf1d26 100644 --- a/src/common/grpc-expr/src/error.rs +++ b/src/common/grpc-expr/src/error.rs @@ -34,6 +34,9 @@ pub enum Error { #[snafu(display("Illegal insert data"))] IllegalInsertData { location: Location }, + #[snafu(display("Illegal delete request, reason: {reason}"))] + IllegalDeleteRequest { reason: String, location: Location }, + #[snafu(display("Column datatype error, source: {}", source))] ColumnDataType { #[snafu(backtrace)] @@ -95,9 +98,11 @@ impl ErrorExt for Error { fn status_code(&self) -> StatusCode { match self { Error::ColumnNotFound { .. } => StatusCode::TableColumnNotFound, - Error::DecodeInsert { .. } | Error::IllegalInsertData { .. } => { - StatusCode::InvalidArguments - } + + Error::DecodeInsert { .. } + | Error::IllegalInsertData { .. } + | Error::IllegalDeleteRequest { .. } => StatusCode::InvalidArguments, + Error::ColumnDataType { .. } => StatusCode::Internal, Error::DuplicatedTimestampColumn { .. } | Error::MissingTimestampColumn { .. } => { StatusCode::InvalidArguments diff --git a/src/common/grpc-expr/src/insert.rs b/src/common/grpc-expr/src/insert.rs index ccd637caa3..975ec713d5 100644 --- a/src/common/grpc-expr/src/insert.rs +++ b/src/common/grpc-expr/src/insert.rs @@ -307,7 +307,7 @@ pub fn to_table_insert_request( }) } -fn add_values_to_builder( +pub(crate) fn add_values_to_builder( builder: &mut Box, values: Values, row_count: usize, diff --git a/src/common/grpc-expr/src/lib.rs b/src/common/grpc-expr/src/lib.rs index 80833e1f31..e46b2ad400 100644 --- a/src/common/grpc-expr/src/lib.rs +++ b/src/common/grpc-expr/src/lib.rs @@ -13,6 +13,7 @@ // limitations under the License. mod alter; +pub mod delete; pub mod error; pub mod insert; diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 6f25002a7d..9cd8de1e76 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -282,6 +282,12 @@ pub enum Error { source: common_grpc_expr::error::Error, }, + #[snafu(display("Failed to convert delete expr to request: {}", source))] + DeleteExprToRequest { + #[snafu(backtrace)] + source: common_grpc_expr::error::Error, + }, + #[snafu(display("Failed to parse SQL, source: {}", source))] ParseSql { #[snafu(backtrace)] @@ -453,6 +459,7 @@ impl ErrorExt for Error { AlterExprToRequest { source, .. } | CreateExprToRequest { source } + | DeleteExprToRequest { source } | InsertData { source } => source.status_code(), ConvertSchema { source, .. } | VectorComputation { source } => source.status_code(), diff --git a/src/datanode/src/instance/grpc.rs b/src/datanode/src/instance/grpc.rs index 5656ef5a9d..d3d452748d 100644 --- a/src/datanode/src/instance/grpc.rs +++ b/src/datanode/src/instance/grpc.rs @@ -15,7 +15,7 @@ use api::v1::ddl_request::Expr as DdlExpr; use api::v1::greptime_request::Request as GrpcRequest; use api::v1::query_request::Query; -use api::v1::{CreateDatabaseExpr, DdlRequest, InsertRequest}; +use api::v1::{CreateDatabaseExpr, DdlRequest, DeleteRequest, InsertRequest}; use async_trait::async_trait; use common_query::Output; use query::parser::{PromQuery, QueryLanguageParser, QueryStatement}; @@ -26,11 +26,13 @@ use session::context::{QueryContext, QueryContextRef}; use snafu::prelude::*; use sql::statements::statement::Statement; use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; +use table::engine::TableReference; use table::requests::CreateDatabaseRequest; use crate::error::{ - self, DecodeLogicalPlanSnafu, ExecuteLogicalPlanSnafu, ExecuteSqlSnafu, PlanStatementSnafu, - Result, + self, CatalogSnafu, DecodeLogicalPlanSnafu, DeleteExprToRequestSnafu, DeleteSnafu, + ExecuteLogicalPlanSnafu, ExecuteSqlSnafu, InsertSnafu, PlanStatementSnafu, Result, + TableNotFoundSnafu, }; use crate::instance::Instance; @@ -104,20 +106,47 @@ impl Instance { let catalog = &ctx.current_catalog(); let schema = &ctx.current_schema(); let table_name = &request.table_name.clone(); + let table_ref = TableReference::full(catalog, schema, table_name); + let table = self .catalog_manager .table(catalog, schema, table_name) .await - .context(error::CatalogSnafu)? - .context(error::TableNotFoundSnafu { table_name })?; + .context(CatalogSnafu)? + .with_context(|| TableNotFoundSnafu { + table_name: table_ref.to_string(), + })?; let request = common_grpc_expr::insert::to_table_insert_request(catalog, schema, request) .context(error::InsertDataSnafu)?; - let affected_rows = table - .insert(request) + let affected_rows = table.insert(request).await.with_context(|_| InsertSnafu { + table_name: table_ref.to_string(), + })?; + Ok(Output::AffectedRows(affected_rows)) + } + + async fn handle_delete(&self, request: DeleteRequest, ctx: QueryContextRef) -> Result { + let catalog = &ctx.current_catalog(); + let schema = &ctx.current_schema(); + let table_name = &request.table_name.clone(); + let table_ref = TableReference::full(catalog, schema, table_name); + + let table = self + .catalog_manager + .table(catalog, schema, table_name) .await - .context(error::InsertSnafu { table_name })?; + .context(CatalogSnafu)? + .with_context(|| TableNotFoundSnafu { + table_name: table_ref.to_string(), + })?; + + let request = common_grpc_expr::delete::to_table_delete_request(request) + .context(DeleteExprToRequestSnafu)?; + + let affected_rows = table.delete(request).await.with_context(|_| DeleteSnafu { + table_name: table_ref.to_string(), + })?; Ok(Output::AffectedRows(affected_rows)) } @@ -142,6 +171,7 @@ impl GrpcQueryHandler for Instance { async fn do_query(&self, request: GrpcRequest, ctx: QueryContextRef) -> Result { match request { GrpcRequest::Insert(request) => self.handle_insert(request, ctx).await, + GrpcRequest::Delete(request) => self.handle_delete(request, ctx).await, GrpcRequest::Query(query_request) => { let query = query_request .query @@ -334,6 +364,72 @@ mod test { assert_eq!(recordbatches.pretty_print().unwrap(), expected); } + #[tokio::test(flavor = "multi_thread")] + async fn test_handle_delete() { + let instance = MockInstance::new("test_handle_delete").await; + let instance = instance.inner(); + test_util::create_test_table(instance, ConcreteDataType::timestamp_millisecond_datatype()) + .await + .unwrap(); + + let query = GrpcRequest::Query(QueryRequest { + query: Some(Query::Sql( + "INSERT INTO demo(host, cpu, memory, ts) VALUES \ + ('host1', 66.6, 1024, 1672201025000),\ + ('host2', 88.8, 333.3, 1672201026000),\ + ('host3', 88.8, 333.3, 1672201026000)" + .to_string(), + )), + }); + let output = instance.do_query(query, QueryContext::arc()).await.unwrap(); + assert!(matches!(output, Output::AffectedRows(3))); + + let request = DeleteRequest { + table_name: "demo".to_string(), + region_number: 0, + key_columns: vec![ + Column { + column_name: "host".to_string(), + values: Some(Values { + string_values: vec!["host2".to_string()], + ..Default::default() + }), + datatype: ColumnDataType::String as i32, + ..Default::default() + }, + Column { + column_name: "ts".to_string(), + values: Some(Values { + ts_millisecond_values: vec![1672201026000], + ..Default::default() + }), + datatype: ColumnDataType::TimestampMillisecond as i32, + ..Default::default() + }, + ], + row_count: 1, + }; + + let request = GrpcRequest::Delete(request); + let output = instance + .do_query(request, QueryContext::arc()) + .await + .unwrap(); + assert!(matches!(output, Output::AffectedRows(1))); + + let output = exec_selection(instance, "SELECT ts, host, cpu FROM demo").await; + let Output::Stream(stream) = output else { unreachable!() }; + let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++---------------------+-------+------+ +| ts | host | cpu | ++---------------------+-------+------+ +| 2022-12-28T04:17:05 | host1 | 66.6 | +| 2022-12-28T04:17:06 | host3 | 88.8 | ++---------------------+-------+------+"; + assert_eq!(recordbatches.pretty_print().unwrap(), expected); + } + #[tokio::test(flavor = "multi_thread")] async fn test_handle_query() { let instance = MockInstance::new("test_handle_query").await; diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 91b29a6bbd..6c4fa9ab40 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -224,6 +224,15 @@ pub enum Error { source: common_grpc_expr::error::Error, }, + #[snafu(display( + "Failed to convert GRPC DeleteRequest to table DeleteRequest, source: {}", + source + ))] + ToTableDeleteRequest { + #[snafu(backtrace)] + source: common_grpc_expr::error::Error, + }, + #[snafu(display("Failed to find catalog by name: {}", catalog_name))] CatalogNotFound { catalog_name: String, @@ -546,6 +555,7 @@ impl ErrorExt for Error { } Error::BuildCreateExprOnInsertion { source } | Error::ToTableInsertRequest { source } + | Error::ToTableDeleteRequest { source } | Error::FindNewColumnsOnInsertion { source } => source.status_code(), Error::ExecuteStatement { source, .. } diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index 5dec7038ad..f2b28ac6e3 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -19,8 +19,8 @@ use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; use api::v1::{ - column_def, AlterExpr, CreateDatabaseExpr, CreateTableExpr, DropTableExpr, FlushTableExpr, - InsertRequest, TableId, + column_def, AlterExpr, CreateDatabaseExpr, CreateTableExpr, DeleteRequest, DropTableExpr, + FlushTableExpr, InsertRequest, TableId, }; use async_trait::async_trait; use catalog::helper::{SchemaKey, SchemaValue}; @@ -52,6 +52,7 @@ use sql::ast::{Ident, Value as SqlValue}; use sql::statements::create::{PartitionEntry, Partitions}; use sql::statements::statement::Statement; use sql::statements::{self, sql_value_to_value}; +use table::engine::TableReference; use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType}; use table::requests::TableOptions; use table::table::AlterContext; @@ -63,8 +64,8 @@ use crate::error::{ self, AlterExprToRequestSnafu, CatalogEntrySerdeSnafu, CatalogSnafu, ColumnDataTypeSnafu, DeserializePartitionSnafu, InvokeDatanodeSnafu, ParseSqlSnafu, PrimaryKeyNotFoundSnafu, RequestDatanodeSnafu, RequestMetaSnafu, Result, SchemaExistsSnafu, StartMetaClientSnafu, - TableAlreadyExistSnafu, TableNotFoundSnafu, TableSnafu, ToTableInsertRequestSnafu, - UnrecognizedTableOptionSnafu, + TableAlreadyExistSnafu, TableNotFoundSnafu, TableSnafu, ToTableDeleteRequestSnafu, + ToTableInsertRequestSnafu, UnrecognizedTableOptionSnafu, }; use crate::expr_factory; use crate::table::DistTable; @@ -516,10 +517,10 @@ impl DistInstance { .context(RequestMetaSnafu) } - // TODO(LFC): Refactor insertion implementation for DistTable, - // GRPC InsertRequest to Table InsertRequest, than split Table InsertRequest, than assemble each GRPC InsertRequest, is rather inefficient, - // should operate on GRPC InsertRequest directly. - // Also remember to check the "region_number" carried in InsertRequest, too. + // TODO(LFC): Refactor GRPC insertion and deletion implementation here, + // Take insertion as an example. GRPC insertion is converted to Table InsertRequest here, + // than split the Table InsertRequest in DistTable, than assemble each GRPC InsertRequest there. + // Rather inefficient, should operate on GRPC InsertRequest directly. async fn handle_dist_insert( &self, request: InsertRequest, @@ -528,12 +529,16 @@ impl DistInstance { let catalog = &ctx.current_catalog(); let schema = &ctx.current_schema(); let table_name = &request.table_name; + let table_ref = TableReference::full(catalog, schema, table_name); + let table = self .catalog_manager .table(catalog, schema, table_name) .await .context(CatalogSnafu)? - .context(TableNotFoundSnafu { table_name })?; + .with_context(|| TableNotFoundSnafu { + table_name: table_ref.to_string(), + })?; let request = common_grpc_expr::insert::to_table_insert_request(catalog, schema, request) .context(ToTableInsertRequestSnafu)?; @@ -542,6 +547,32 @@ impl DistInstance { Ok(Output::AffectedRows(affected_rows)) } + async fn handle_dist_delete( + &self, + request: DeleteRequest, + ctx: QueryContextRef, + ) -> Result { + let catalog = &ctx.current_catalog(); + let schema = &ctx.current_schema(); + let table_name = &request.table_name; + let table_ref = TableReference::full(catalog, schema, table_name); + + let table = self + .catalog_manager + .table(catalog, schema, table_name) + .await + .context(CatalogSnafu)? + .with_context(|| TableNotFoundSnafu { + table_name: table_ref.to_string(), + })?; + + let request = common_grpc_expr::delete::to_table_delete_request(request) + .context(ToTableDeleteRequestSnafu)?; + + let affected_rows = table.delete(request).await.context(TableSnafu)?; + Ok(Output::AffectedRows(affected_rows)) + } + #[cfg(test)] pub(crate) fn catalog_manager(&self) -> Arc { self.catalog_manager.clone() diff --git a/src/frontend/src/instance/distributed/grpc.rs b/src/frontend/src/instance/distributed/grpc.rs index bfcb0233f4..1befeae4f6 100644 --- a/src/frontend/src/instance/distributed/grpc.rs +++ b/src/frontend/src/instance/distributed/grpc.rs @@ -37,6 +37,7 @@ impl GrpcQueryHandler for DistInstance { async fn do_query(&self, request: Request, ctx: QueryContextRef) -> Result { match request { Request::Insert(request) => self.handle_dist_insert(request, ctx).await, + Request::Delete(request) => self.handle_dist_delete(request, ctx).await, Request::Query(_) => { unreachable!("Query should have been handled directly in Frontend Instance!") } diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index f74e938ab7..06960b8d43 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -74,9 +74,8 @@ impl GrpcQueryHandler for Instance { } } } - Request::Ddl(request) => { - let query = Request::Ddl(request); - GrpcQueryHandler::do_query(&*self.grpc_query_handler, query, ctx).await? + Request::Ddl(_) | Request::Delete(_) => { + GrpcQueryHandler::do_query(self.grpc_query_handler.as_ref(), request, ctx).await? } }; Ok(output) @@ -91,8 +90,8 @@ mod test { use api::v1::ddl_request::Expr as DdlExpr; use api::v1::{ alter_expr, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef, - CreateDatabaseExpr, CreateTableExpr, DdlRequest, DropTableExpr, FlushTableExpr, - InsertRequest, QueryRequest, + CreateDatabaseExpr, CreateTableExpr, DdlRequest, DeleteRequest, DropTableExpr, + FlushTableExpr, InsertRequest, QueryRequest, }; use catalog::helper::{TableGlobalKey, TableGlobalValue}; use common_catalog::consts::MITO_ENGINE; @@ -241,11 +240,11 @@ mod test { } #[tokio::test(flavor = "multi_thread")] - async fn test_distributed_insert_and_query() { + async fn test_distributed_insert_delete_and_query() { common_telemetry::init_default_ut_logging(); let instance = - tests::create_distributed_instance("test_distributed_insert_and_query").await; + tests::create_distributed_instance("test_distributed_insert_delete_and_query").await; let frontend = instance.frontend.as_ref(); let table_name = "my_dist_table"; @@ -253,6 +252,7 @@ mod test { r" CREATE TABLE {table_name} ( a INT, + b STRING PRIMARY KEY, ts TIMESTAMP, TIME INDEX (ts) ) PARTITION BY RANGE COLUMNS(a) ( @@ -264,7 +264,7 @@ CREATE TABLE {table_name} ( ); create_table(frontend, sql).await; - test_insert_and_query_on_existing_table(frontend, table_name).await; + test_insert_delete_and_query_on_existing_table(frontend, table_name).await; verify_data_distribution( &instance, @@ -273,48 +273,52 @@ CREATE TABLE {table_name} ( ( 0u32, "\ -+---------------------+---+ -| ts | a | -+---------------------+---+ -| 2023-01-01T07:26:12 | 1 | -| 2023-01-01T07:26:14 | | -+---------------------+---+", ++---------------------+---+-------------------+ +| ts | a | b | ++---------------------+---+-------------------+ +| 2023-01-01T07:26:12 | 1 | ts: 1672557972000 | +| 2023-01-01T07:26:14 | 3 | ts: 1672557974000 | +| 2023-01-01T07:26:15 | 4 | ts: 1672557975000 | +| 2023-01-01T07:26:16 | 5 | ts: 1672557976000 | +| 2023-01-01T07:26:17 | | ts: 1672557977000 | ++---------------------+---+-------------------+", ), ( 1u32, "\ -+---------------------+----+ -| ts | a | -+---------------------+----+ -| 2023-01-01T07:26:13 | 11 | -+---------------------+----+", ++---------------------+----+-------------------+ +| ts | a | b | ++---------------------+----+-------------------+ +| 2023-01-01T07:26:18 | 11 | ts: 1672557978000 | ++---------------------+----+-------------------+", ), ( 2u32, "\ -+---------------------+----+ -| ts | a | -+---------------------+----+ -| 2023-01-01T07:26:15 | 20 | -| 2023-01-01T07:26:16 | 22 | -+---------------------+----+", ++---------------------+----+-------------------+ +| ts | a | b | ++---------------------+----+-------------------+ +| 2023-01-01T07:26:20 | 20 | ts: 1672557980000 | +| 2023-01-01T07:26:21 | 21 | ts: 1672557981000 | +| 2023-01-01T07:26:23 | 23 | ts: 1672557983000 | ++---------------------+----+-------------------+", ), ( 3u32, "\ -+---------------------+----+ -| ts | a | -+---------------------+----+ -| 2023-01-01T07:26:17 | 50 | -| 2023-01-01T07:26:18 | 55 | -| 2023-01-01T07:26:19 | 99 | -+---------------------+----+", ++---------------------+----+-------------------+ +| ts | a | b | ++---------------------+----+-------------------+ +| 2023-01-01T07:26:24 | 50 | ts: 1672557984000 | +| 2023-01-01T07:26:25 | 51 | ts: 1672557985000 | +| 2023-01-01T07:26:27 | 53 | ts: 1672557987000 | ++---------------------+----+-------------------+", ), ]), ) .await; - test_insert_and_query_on_auto_created_table(frontend).await; + test_insert_delete_and_query_on_auto_created_table(frontend).await; // Auto created table has only one region. verify_data_distribution( @@ -323,16 +327,14 @@ CREATE TABLE {table_name} ( HashMap::from([( 0u32, "\ -+---------------------+---+ -| ts | a | -+---------------------+---+ -| 2023-01-01T07:26:15 | 4 | -| 2023-01-01T07:26:16 | | -| 2023-01-01T07:26:17 | 6 | -| 2023-01-01T07:26:18 | | -| 2023-01-01T07:26:19 | | -| 2023-01-01T07:26:20 | | -+---------------------+---+", ++---------------------+---+---+ +| ts | a | b | ++---------------------+---+---+ +| 2023-01-01T07:26:16 | | | +| 2023-01-01T07:26:17 | 6 | | +| 2023-01-01T07:26:18 | | x | +| 2023-01-01T07:26:20 | | z | ++---------------------+---+---+", )]), ) .await; @@ -347,12 +349,12 @@ CREATE TABLE {table_name} ( let instance = &standalone.instance; let table_name = "my_table"; - let sql = format!("CREATE TABLE {table_name} (a INT, ts TIMESTAMP, TIME INDEX (ts))"); + let sql = format!("CREATE TABLE {table_name} (a INT, b STRING, ts TIMESTAMP, TIME INDEX (ts), PRIMARY KEY (a, b))"); create_table(instance, sql).await; - test_insert_and_query_on_existing_table(instance, table_name).await; + test_insert_delete_and_query_on_existing_table(instance, table_name).await; - test_insert_and_query_on_auto_created_table(instance).await + test_insert_delete_and_query_on_auto_created_table(instance).await } #[tokio::test(flavor = "multi_thread")] @@ -379,7 +381,7 @@ CREATE TABLE {table_name} ( ); create_table(frontend, sql).await; - test_insert_and_query_on_existing_table(frontend, table_name).await; + test_insert_delete_and_query_on_existing_table(frontend, table_name).await; flush_table(frontend, "greptime", "public", table_name, None).await; // Wait for previous task finished @@ -434,11 +436,11 @@ CREATE TABLE {table_name} ( let data_tmp_dir = standalone.data_tmp_dir(); let table_name = "my_table"; - let sql = format!("CREATE TABLE {table_name} (a INT, ts TIMESTAMP, TIME INDEX (ts))"); + let sql = format!("CREATE TABLE {table_name} (a INT, b STRING, ts TIMESTAMP, TIME INDEX (ts), PRIMARY KEY (a, b))"); create_table(instance, sql).await; - test_insert_and_query_on_existing_table(instance, table_name).await; + test_insert_delete_and_query_on_existing_table(instance, table_name).await; let table_id = 1024; let region_id = 0; @@ -486,33 +488,55 @@ CREATE TABLE {table_name} ( assert!(matches!(output, Output::AffectedRows(0))); } - async fn test_insert_and_query_on_existing_table(instance: &Instance, table_name: &str) { + async fn test_insert_delete_and_query_on_existing_table(instance: &Instance, table_name: &str) { + let ts_millisecond_values = vec![ + 1672557972000, + 1672557973000, + 1672557974000, + 1672557975000, + 1672557976000, + 1672557977000, + 1672557978000, + 1672557979000, + 1672557980000, + 1672557981000, + 1672557982000, + 1672557983000, + 1672557984000, + 1672557985000, + 1672557986000, + 1672557987000, + ]; let insert = InsertRequest { table_name: table_name.to_string(), columns: vec![ Column { column_name: "a".to_string(), values: Some(Values { - i32_values: vec![1, 11, 20, 22, 50, 55, 99], + i32_values: vec![1, 2, 3, 4, 5, 11, 12, 20, 21, 22, 23, 50, 51, 52, 53], ..Default::default() }), - null_mask: vec![4], + null_mask: vec![32, 0], semantic_type: SemanticType::Field as i32, datatype: ColumnDataType::Int32 as i32, }, + Column { + column_name: "b".to_string(), + values: Some(Values { + string_values: ts_millisecond_values + .iter() + .map(|x| format!("ts: {x}")) + .collect(), + ..Default::default() + }), + semantic_type: SemanticType::Tag as i32, + datatype: ColumnDataType::String as i32, + ..Default::default() + }, Column { column_name: "ts".to_string(), values: Some(Values { - ts_millisecond_values: vec![ - 1672557972000, - 1672557973000, - 1672557974000, - 1672557975000, - 1672557976000, - 1672557977000, - 1672557978000, - 1672557979000, - ], + ts_millisecond_values, ..Default::default() }), semantic_type: SemanticType::Timestamp as i32, @@ -520,35 +544,113 @@ CREATE TABLE {table_name} ( ..Default::default() }, ], - row_count: 8, + row_count: 16, ..Default::default() }; - - let request = Request::Insert(insert); - let output = query(instance, request).await; - assert!(matches!(output, Output::AffectedRows(8))); + let output = query(instance, Request::Insert(insert)).await; + assert!(matches!(output, Output::AffectedRows(16))); let request = Request::Query(QueryRequest { query: Some(Query::Sql(format!( - "SELECT ts, a FROM {table_name} ORDER BY ts" + "SELECT ts, a, b FROM {table_name} ORDER BY ts" ))), }); + let output = query(instance, request.clone()).await; + let Output::Stream(stream) = output else { unreachable!() }; + let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++---------------------+----+-------------------+ +| ts | a | b | ++---------------------+----+-------------------+ +| 2023-01-01T07:26:12 | 1 | ts: 1672557972000 | +| 2023-01-01T07:26:13 | 2 | ts: 1672557973000 | +| 2023-01-01T07:26:14 | 3 | ts: 1672557974000 | +| 2023-01-01T07:26:15 | 4 | ts: 1672557975000 | +| 2023-01-01T07:26:16 | 5 | ts: 1672557976000 | +| 2023-01-01T07:26:17 | | ts: 1672557977000 | +| 2023-01-01T07:26:18 | 11 | ts: 1672557978000 | +| 2023-01-01T07:26:19 | 12 | ts: 1672557979000 | +| 2023-01-01T07:26:20 | 20 | ts: 1672557980000 | +| 2023-01-01T07:26:21 | 21 | ts: 1672557981000 | +| 2023-01-01T07:26:22 | 22 | ts: 1672557982000 | +| 2023-01-01T07:26:23 | 23 | ts: 1672557983000 | +| 2023-01-01T07:26:24 | 50 | ts: 1672557984000 | +| 2023-01-01T07:26:25 | 51 | ts: 1672557985000 | +| 2023-01-01T07:26:26 | 52 | ts: 1672557986000 | +| 2023-01-01T07:26:27 | 53 | ts: 1672557987000 | ++---------------------+----+-------------------+"; + assert_eq!(recordbatches.pretty_print().unwrap(), expected); + + let delete = DeleteRequest { + table_name: table_name.to_string(), + region_number: 0, + key_columns: vec![ + Column { + column_name: "a".to_string(), + semantic_type: SemanticType::Field as i32, + values: Some(Values { + i32_values: vec![2, 12, 22, 52], + ..Default::default() + }), + datatype: ColumnDataType::Int32 as i32, + ..Default::default() + }, + Column { + column_name: "b".to_string(), + semantic_type: SemanticType::Tag as i32, + values: Some(Values { + string_values: vec![ + "ts: 1672557973000".to_string(), + "ts: 1672557979000".to_string(), + "ts: 1672557982000".to_string(), + "ts: 1672557986000".to_string(), + ], + ..Default::default() + }), + datatype: ColumnDataType::String as i32, + ..Default::default() + }, + Column { + column_name: "ts".to_string(), + semantic_type: SemanticType::Timestamp as i32, + values: Some(Values { + ts_millisecond_values: vec![ + 1672557973000, + 1672557979000, + 1672557982000, + 1672557986000, + ], + ..Default::default() + }), + datatype: ColumnDataType::TimestampMillisecond as i32, + ..Default::default() + }, + ], + row_count: 4, + }; + let output = query(instance, Request::Delete(delete)).await; + assert!(matches!(output, Output::AffectedRows(4))); + let output = query(instance, request).await; let Output::Stream(stream) = output else { unreachable!() }; let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); let expected = "\ -+---------------------+----+ -| ts | a | -+---------------------+----+ -| 2023-01-01T07:26:12 | 1 | -| 2023-01-01T07:26:13 | 11 | -| 2023-01-01T07:26:14 | | -| 2023-01-01T07:26:15 | 20 | -| 2023-01-01T07:26:16 | 22 | -| 2023-01-01T07:26:17 | 50 | -| 2023-01-01T07:26:18 | 55 | -| 2023-01-01T07:26:19 | 99 | -+---------------------+----+"; ++---------------------+----+-------------------+ +| ts | a | b | ++---------------------+----+-------------------+ +| 2023-01-01T07:26:12 | 1 | ts: 1672557972000 | +| 2023-01-01T07:26:14 | 3 | ts: 1672557974000 | +| 2023-01-01T07:26:15 | 4 | ts: 1672557975000 | +| 2023-01-01T07:26:16 | 5 | ts: 1672557976000 | +| 2023-01-01T07:26:17 | | ts: 1672557977000 | +| 2023-01-01T07:26:18 | 11 | ts: 1672557978000 | +| 2023-01-01T07:26:20 | 20 | ts: 1672557980000 | +| 2023-01-01T07:26:21 | 21 | ts: 1672557981000 | +| 2023-01-01T07:26:23 | 23 | ts: 1672557983000 | +| 2023-01-01T07:26:24 | 50 | ts: 1672557984000 | +| 2023-01-01T07:26:25 | 51 | ts: 1672557985000 | +| 2023-01-01T07:26:27 | 53 | ts: 1672557987000 | ++---------------------+----+-------------------+"; assert_eq!(recordbatches.pretty_print().unwrap(), expected); } @@ -583,7 +685,7 @@ CREATE TABLE {table_name} ( for (region, dn) in region_to_dn_map.iter() { let stmt = QueryLanguageParser::parse_sql(&format!( - "SELECT ts, a FROM {table_name} ORDER BY ts" + "SELECT ts, a, b FROM {table_name} ORDER BY ts" )) .unwrap(); let dn = instance.datanodes.get(dn).unwrap(); @@ -603,7 +705,7 @@ CREATE TABLE {table_name} ( } } - async fn test_insert_and_query_on_auto_created_table(instance: &Instance) { + async fn test_insert_delete_and_query_on_auto_created_table(instance: &Instance) { let insert = InsertRequest { table_name: "auto_created_table".to_string(), columns: vec![ @@ -675,7 +777,7 @@ CREATE TABLE {table_name} ( "SELECT ts, a, b FROM auto_created_table".to_string(), )), }); - let output = query(instance, request).await; + let output = query(instance, request.clone()).await; let Output::Stream(stream) = output else { unreachable!() }; let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); let expected = "\ @@ -688,6 +790,39 @@ CREATE TABLE {table_name} ( | 2023-01-01T07:26:18 | | x | | 2023-01-01T07:26:19 | | | | 2023-01-01T07:26:20 | | z | ++---------------------+---+---+"; + assert_eq!(recordbatches.pretty_print().unwrap(), expected); + + let delete = DeleteRequest { + table_name: "auto_created_table".to_string(), + region_number: 0, + key_columns: vec![Column { + column_name: "ts".to_string(), + values: Some(Values { + ts_millisecond_values: vec![1672557975000, 1672557979000], + ..Default::default() + }), + semantic_type: SemanticType::Timestamp as i32, + datatype: ColumnDataType::TimestampMillisecond as i32, + ..Default::default() + }], + row_count: 2, + }; + + let output = query(instance, Request::Delete(delete)).await; + assert!(matches!(output, Output::AffectedRows(2))); + + let output = query(instance, request).await; + let Output::Stream(stream) = output else { unreachable!() }; + let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++---------------------+---+---+ +| ts | a | b | ++---------------------+---+---+ +| 2023-01-01T07:26:16 | | | +| 2023-01-01T07:26:17 | 6 | | +| 2023-01-01T07:26:18 | | x | +| 2023-01-01T07:26:20 | | z | +---------------------+---+---+"; assert_eq!(recordbatches.pretty_print().unwrap(), expected); } diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index 2531d382e9..a10b3a1d01 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::any::Any; +use std::iter; use std::sync::Arc; use api::v1::AlterExpr; @@ -36,18 +37,23 @@ use datafusion_common::DataFusionError; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; use meta_client::rpc::TableName; use partition::manager::PartitionRuleManagerRef; +use partition::splitter::WriteSplitter; use snafu::prelude::*; +use store_api::storage::RegionNumber; use table::error::TableOperationSnafu; use table::metadata::{FilterPushDownType, TableInfo, TableInfoRef}; -use table::requests::{AlterTableRequest, InsertRequest}; +use table::requests::{AlterTableRequest, DeleteRequest, InsertRequest}; use table::table::AlterContext; use table::{meter_insert_request, Table}; use tokio::sync::RwLock; use crate::datanode::DatanodeClients; -use crate::error::{self, Result}; +use crate::error::{self, FindDatanodeSnafu, FindTableRouteSnafu, Result}; +use crate::table::delete::to_grpc_delete_request; +use crate::table::insert::to_grpc_insert_request; use crate::table::scan::{DatanodeInstance, TableScanPlan}; +mod delete; pub mod insert; pub(crate) mod scan; @@ -84,8 +90,15 @@ impl Table for DistTable { .map_err(BoxedError::new) .context(TableOperationSnafu)?; + let inserts = splits + .into_iter() + .map(|(region_number, insert)| to_grpc_insert_request(region_number, insert)) + .collect::>>() + .map_err(BoxedError::new) + .context(TableOperationSnafu)?; + let output = self - .dist_insert(splits) + .dist_insert(inserts) .await .map_err(BoxedError::new) .context(TableOperationSnafu)?; @@ -155,6 +168,52 @@ impl Table for DistTable { .map_err(BoxedError::new) .context(TableOperationSnafu) } + + async fn delete(&self, request: DeleteRequest) -> table::Result { + let partition_rule = self + .partition_manager + .find_table_partition_rule(&self.table_name) + .await + .map_err(BoxedError::new) + .context(TableOperationSnafu)?; + + let schema = self.schema(); + let time_index = &schema + .timestamp_column() + .with_context(|| table::error::MissingTimeIndexColumnSnafu { + table_name: self.table_name.to_string(), + })? + .name; + + let table_info = self.table_info(); + let key_column_names = table_info + .meta + .row_key_column_names() + .chain(iter::once(time_index)) + .collect::>(); + + let requests = WriteSplitter::with_partition_rule(partition_rule) + .split_delete(request, key_column_names) + .map_err(BoxedError::new) + .and_then(|requests| { + requests + .into_iter() + .map(|(region_number, request)| { + to_grpc_delete_request(&self.table_name, region_number, request) + }) + .collect::>>() + .map_err(BoxedError::new) + }) + .context(TableOperationSnafu)?; + + let output = self + .dist_delete(requests) + .await + .map_err(BoxedError::new) + .context(TableOperationSnafu)?; + let Output::AffectedRows(rows) = output else { unreachable!() }; + Ok(rows) + } } impl DistTable { @@ -274,6 +333,46 @@ impl DistTable { } Ok(()) } + + async fn find_datanode_instances( + &self, + regions: &[RegionNumber], + ) -> Result> { + let table_name = &self.table_name; + let route = self + .partition_manager + .find_table_route(table_name) + .await + .with_context(|_| FindTableRouteSnafu { + table_name: table_name.to_string(), + })?; + + let datanodes = regions + .iter() + .map(|&n| { + let region_id = n as u64; + route + .region_routes + .iter() + .find_map(|x| { + if x.region.id == region_id { + x.leader_peer.clone() + } else { + None + } + }) + .context(FindDatanodeSnafu { region: region_id }) + }) + .collect::>>()?; + + let mut instances = Vec::with_capacity(datanodes.len()); + for datanode in datanodes { + let client = self.datanode_clients.get_client(&datanode).await; + let db = Database::new(&table_name.catalog_name, &table_name.schema_name, client); + instances.push(DatanodeInstance::new(Arc::new(self.clone()) as _, db)); + } + Ok(instances) + } } fn project_schema(table_schema: SchemaRef, projection: Option<&Vec>) -> SchemaRef { diff --git a/src/frontend/src/table/delete.rs b/src/frontend/src/table/delete.rs new file mode 100644 index 0000000000..481a72cc8e --- /dev/null +++ b/src/frontend/src/table/delete.rs @@ -0,0 +1,110 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::DeleteRequest as GrpcDeleteRequest; +use common_query::Output; +use futures::future; +use meta_client::rpc::TableName; +use snafu::ResultExt; +use store_api::storage::RegionNumber; +use table::requests::DeleteRequest; + +use crate::error::{JoinTaskSnafu, RequestDatanodeSnafu, Result}; +use crate::table::insert::to_grpc_columns; +use crate::table::DistTable; + +impl DistTable { + pub(super) async fn dist_delete(&self, requests: Vec) -> Result { + let regions = requests.iter().map(|x| x.region_number).collect::>(); + let instances = self.find_datanode_instances(®ions).await?; + + let results = future::try_join_all(instances.into_iter().zip(requests.into_iter()).map( + |(instance, request)| { + common_runtime::spawn_write(async move { + instance + .grpc_delete(request) + .await + .context(RequestDatanodeSnafu) + }) + }, + )) + .await + .context(JoinTaskSnafu)?; + + let affected_rows = results.into_iter().sum::>()?; + Ok(Output::AffectedRows(affected_rows as _)) + } +} + +pub(super) fn to_grpc_delete_request( + table_name: &TableName, + region_number: RegionNumber, + request: DeleteRequest, +) -> Result { + let (key_columns, row_count) = to_grpc_columns(&request.key_column_values)?; + Ok(GrpcDeleteRequest { + table_name: table_name.table_name.clone(), + region_number, + key_columns, + row_count, + }) +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::Arc; + + use api::v1::column::{SemanticType, Values}; + use api::v1::{Column, ColumnDataType}; + use datatypes::prelude::VectorRef; + use datatypes::vectors::Int32Vector; + + use super::*; + + #[test] + fn test_to_grpc_delete_request() { + let table_name = TableName { + catalog_name: "greptime".to_string(), + schema_name: "public".to_string(), + table_name: "foo".to_string(), + }; + let region_number = 1; + + let key_column_values = HashMap::from([( + "id".to_string(), + Arc::new(Int32Vector::from_slice(vec![1, 2, 3])) as VectorRef, + )]); + let request = DeleteRequest { key_column_values }; + + let result = to_grpc_delete_request(&table_name, region_number, request).unwrap(); + + assert_eq!(result.table_name, "foo"); + assert_eq!(result.region_number, region_number); + assert_eq!( + result.key_columns, + vec![Column { + column_name: "id".to_string(), + semantic_type: SemanticType::Field as i32, + values: Some(Values { + i32_values: vec![1, 2, 3], + ..Default::default() + }), + null_mask: vec![0], + datatype: ColumnDataType::Int32 as i32, + }] + ); + assert_eq!(result.row_count, 3); + } +} diff --git a/src/frontend/src/table/insert.rs b/src/frontend/src/table/insert.rs index b134b16018..18f73766a9 100644 --- a/src/frontend/src/table/insert.rs +++ b/src/frontend/src/table/insert.rs @@ -13,79 +13,54 @@ // limitations under the License. use std::collections::HashMap; -use std::sync::Arc; use api::helper::{push_vals, ColumnDataTypeWrapper}; use api::v1::column::SemanticType; use api::v1::{Column, InsertRequest as GrpcInsertRequest}; -use client::Database; use common_query::Output; -use datatypes::prelude::ConcreteDataType; -use snafu::{ensure, OptionExt, ResultExt}; +use datatypes::prelude::{ConcreteDataType, VectorRef}; +use futures::future; +use snafu::{ensure, ResultExt}; use store_api::storage::RegionNumber; use table::requests::InsertRequest; use super::DistTable; use crate::error; -use crate::error::{FindTableRouteSnafu, Result}; -use crate::table::scan::DatanodeInstance; +use crate::error::{JoinTaskSnafu, RequestDatanodeSnafu, Result}; impl DistTable { - pub async fn dist_insert( - &self, - inserts: HashMap, - ) -> Result { - let table_name = &self.table_name; - let route = self - .partition_manager - .find_table_route(&self.table_name) - .await - .with_context(|_| FindTableRouteSnafu { - table_name: table_name.to_string(), - })?; + pub async fn dist_insert(&self, inserts: Vec) -> Result { + let regions = inserts.iter().map(|x| x.region_number).collect::>(); + let instances = self.find_datanode_instances(®ions).await?; - let mut joins = Vec::with_capacity(inserts.len()); - for (region_id, insert) in inserts { - let datanode = route - .region_routes - .iter() - .find_map(|x| { - if x.region.id == region_id as u64 { - x.leader_peer.clone() - } else { - None - } + let results = future::try_join_all(instances.into_iter().zip(inserts.into_iter()).map( + |(instance, request)| { + common_runtime::spawn_write(async move { + instance + .grpc_insert(request) + .await + .context(RequestDatanodeSnafu) }) - .context(error::FindDatanodeSnafu { region: region_id })?; + }, + )) + .await + .context(JoinTaskSnafu)?; - let client = self.datanode_clients.get_client(&datanode).await; - let db = Database::new(&table_name.catalog_name, &table_name.schema_name, client); - let instance = DatanodeInstance::new(Arc::new(self.clone()) as _, db); - - let join = common_runtime::spawn_write(async move { - instance - .grpc_insert(to_grpc_insert_request(region_id, insert)?) - .await - .context(error::RequestDatanodeSnafu) - }); - - joins.push(join); - } - - let mut success = 0; - for join in joins { - let rows = join.await.context(error::JoinTaskSnafu)?? as usize; - success += rows; - } - Ok(Output::AffectedRows(success)) + let affected_rows = results.into_iter().sum::>()?; + Ok(Output::AffectedRows(affected_rows as _)) } } pub fn insert_request_to_insert_batch(insert: &InsertRequest) -> Result<(Vec, u32)> { + to_grpc_columns(&insert.columns_values) +} + +pub(crate) fn to_grpc_columns( + columns_values: &HashMap, +) -> Result<(Vec, u32)> { let mut row_count = None; - let columns = insert - .columns_values + let columns = columns_values .iter() .map(|(column_name, vector)| { match row_count { @@ -129,7 +104,7 @@ pub fn insert_request_to_insert_batch(insert: &InsertRequest) -> Result<(Vec Result { diff --git a/src/frontend/src/table/scan.rs b/src/frontend/src/table/scan.rs index a43b40a665..136565b53c 100644 --- a/src/frontend/src/table/scan.rs +++ b/src/frontend/src/table/scan.rs @@ -15,7 +15,7 @@ use std::fmt::Formatter; use std::sync::Arc; -use api::v1::InsertRequest; +use api::v1::{DeleteRequest, InsertRequest}; use client::Database; use common_query::prelude::Expr; use common_query::Output; @@ -51,6 +51,10 @@ impl DatanodeInstance { self.db.insert(request).await } + pub(crate) async fn grpc_delete(&self, request: DeleteRequest) -> client::Result { + self.db.delete(request).await + } + pub(crate) async fn grpc_table_scan(&self, plan: TableScanPlan) -> Result { let logical_plan = self.build_logical_plan(&plan)?; diff --git a/src/frontend/src/tests/instance_test.rs b/src/frontend/src/tests/instance_test.rs index 051378291c..ea8948abad 100644 --- a/src/frontend/src/tests/instance_test.rs +++ b/src/frontend/src/tests/instance_test.rs @@ -776,8 +776,7 @@ async fn test_use_database(instance: Arc) { check_output_stream(output, expected).await; } -// should apply to both instances. tracked in #755 -#[apply(standalone_instance_case)] +#[apply(both_instances_cases)] async fn test_delete(instance: Arc) { let instance = instance.frontend(); diff --git a/src/partition/src/columns.rs b/src/partition/src/columns.rs index 4c5c13b33a..99d720adc0 100644 --- a/src/partition/src/columns.rs +++ b/src/partition/src/columns.rs @@ -19,7 +19,7 @@ use datatypes::value::Value; use snafu::ensure; use store_api::storage::RegionNumber; -use crate::error::{self, Error}; +use crate::error::{self, Result}; use crate::partition::{PartitionBound, PartitionExpr, PartitionRule}; /// A [RangeColumnsPartitionRule] is very similar to [RangePartitionRule] except that it allows @@ -145,7 +145,7 @@ impl PartitionRule for RangeColumnsPartitionRule { self.column_list.clone() } - fn find_region(&self, values: &[Value]) -> Result { + fn find_region(&self, values: &[Value]) -> Result { ensure!( values.len() == self.column_list.len(), error::RegionKeysSizeSnafu { @@ -166,7 +166,7 @@ impl PartitionRule for RangeColumnsPartitionRule { }) } - fn find_regions(&self, exprs: &[PartitionExpr]) -> Result, Error> { + fn find_regions_by_exprs(&self, exprs: &[PartitionExpr]) -> Result> { let regions = if exprs.iter().all(|x| self.column_list.contains(&x.column)) { let PartitionExpr { column: _, @@ -269,7 +269,7 @@ mod tests { value: value.into(), }, ]; - let regions = rule.find_regions(&exprs).unwrap(); + let regions = rule.find_regions_by_exprs(&exprs).unwrap(); assert_eq!( regions, expected_regions.into_iter().collect::>() @@ -332,7 +332,7 @@ mod tests { value: "hz".into(), }, ]; - let regions = rule.find_regions(&exprs).unwrap(); + let regions = rule.find_regions_by_exprs(&exprs).unwrap(); assert_eq!(regions, vec![1, 2, 3, 4, 5, 6]); } diff --git a/src/partition/src/error.rs b/src/partition/src/error.rs index b1511c6b87..5ffcd6a001 100644 --- a/src/partition/src/error.rs +++ b/src/partition/src/error.rs @@ -93,6 +93,9 @@ pub enum Error { #[snafu(display("Invalid InsertRequest, reason: {}", reason))] InvalidInsertRequest { reason: String, location: Location }, + #[snafu(display("Invalid DeleteRequest, reason: {}", reason))] + InvalidDeleteRequest { reason: String, location: Location }, + #[snafu(display( "Invalid table route data in meta, table name: {}, msg: {}", table_name, @@ -127,6 +130,7 @@ impl ErrorExt for Error { | Error::FindRegions { .. } | Error::RegionKeysSize { .. } | Error::InvalidInsertRequest { .. } + | Error::InvalidDeleteRequest { .. } | Error::FindPartitionColumn { .. } => StatusCode::InvalidArguments, Error::SerializeJson { .. } | Error::DeserializeJson { .. } => StatusCode::Internal, Error::InvalidTableRouteData { .. } => StatusCode::Internal, diff --git a/src/partition/src/manager.rs b/src/partition/src/manager.rs index 2520b2429b..9e8b0b4446 100644 --- a/src/partition/src/manager.rs +++ b/src/partition/src/manager.rs @@ -209,7 +209,7 @@ impl PartitionRuleManager { } target.into_iter().collect::>() } else { - partition_rule.find_regions(&[])? + partition_rule.find_regions_by_exprs(&[])? }; ensure!( !regions.is_empty(), @@ -249,7 +249,7 @@ fn find_regions0(partition_rule: PartitionRuleRef, filter: &Expr) -> Result>()); } @@ -277,7 +277,7 @@ fn find_regions0(partition_rule: PartitionRuleRef, filter: &Expr) -> Result>()) } diff --git a/src/partition/src/partition.rs b/src/partition/src/partition.rs index 9bd16c548a..0b107fa6b7 100644 --- a/src/partition/src/partition.rs +++ b/src/partition/src/partition.rs @@ -23,7 +23,7 @@ use serde::{Deserialize, Serialize}; use snafu::ResultExt; use store_api::storage::RegionNumber; -use crate::error::{self, Error}; +use crate::error::{self, Error, Result}; pub type PartitionRuleRef = Arc; @@ -32,11 +32,15 @@ pub trait PartitionRule: Sync + Send { fn partition_columns(&self) -> Vec; - // TODO(LFC): Unify `find_region` and `find_regions` methods when distributed read and write features are both merged into develop. - // Or find better names since one is mainly for writes and the other is for reads. - fn find_region(&self, values: &[Value]) -> Result; + /// Finds the target region by the partition values. + /// + /// Note that the `values` should have the same length as the `partition_columns`. + fn find_region(&self, values: &[Value]) -> Result; - fn find_regions(&self, exprs: &[PartitionExpr]) -> Result, Error>; + /// Finds the target regions by the partition expressions. + /// + /// Note that the `exprs` should have the same length as the `partition_columns`. + fn find_regions_by_exprs(&self, exprs: &[PartitionExpr]) -> Result>; } /// The right bound(exclusive) of partition range. @@ -72,7 +76,7 @@ impl PartitionDef { impl TryFrom for PartitionDef { type Error = Error; - fn try_from(partition: MetaPartition) -> Result { + fn try_from(partition: MetaPartition) -> Result { let MetaPartition { column_list, value_list, @@ -86,7 +90,7 @@ impl TryFrom for PartitionDef { let partition_bounds = value_list .into_iter() .map(|x| serde_json::from_str(&String::from_utf8_lossy(&x))) - .collect::, serde_json::Error>>() + .collect::, serde_json::Error>>() .context(error::DeserializeJsonSnafu)?; Ok(PartitionDef { @@ -99,7 +103,7 @@ impl TryFrom for PartitionDef { impl TryFrom for MetaPartition { type Error = Error; - fn try_from(partition: PartitionDef) -> Result { + fn try_from(partition: PartitionDef) -> Result { let PartitionDef { partition_columns: columns, partition_bounds: bounds, @@ -113,7 +117,7 @@ impl TryFrom for MetaPartition { let value_list = bounds .into_iter() .map(|x| serde_json::to_string(&x).map(|s| s.into_bytes())) - .collect::>, serde_json::Error>>() + .collect::>, serde_json::Error>>() .context(error::SerializeJsonSnafu)?; Ok(MetaPartition { diff --git a/src/partition/src/range.rs b/src/partition/src/range.rs index 23df5b296d..26639d7ffe 100644 --- a/src/partition/src/range.rs +++ b/src/partition/src/range.rs @@ -120,7 +120,7 @@ impl PartitionRule for RangePartitionRule { }) } - fn find_regions(&self, exprs: &[PartitionExpr]) -> Result, Error> { + fn find_regions_by_exprs(&self, exprs: &[PartitionExpr]) -> Result, Error> { if exprs.is_empty() { return Ok(self.regions.clone()); } @@ -197,7 +197,7 @@ mod test { op, value: value.into(), }; - let regions = rule.find_regions(&[expr]).unwrap(); + let regions = rule.find_regions_by_exprs(&[expr]).unwrap(); assert_eq!( regions, expected_regions.into_iter().collect::>() diff --git a/src/partition/src/splitter.rs b/src/partition/src/splitter.rs index aadbb16ee7..eed4b4af7b 100644 --- a/src/partition/src/splitter.rs +++ b/src/partition/src/splitter.rs @@ -22,7 +22,10 @@ use snafu::{ensure, OptionExt}; use store_api::storage::RegionNumber; use table::requests::{DeleteRequest, InsertRequest}; -use crate::error::{FindPartitionColumnSnafu, FindRegionSnafu, InvalidInsertRequestSnafu, Result}; +use crate::error::{ + FindPartitionColumnSnafu, FindRegionSnafu, InvalidDeleteRequestSnafu, + InvalidInsertRequestSnafu, Result, +}; use crate::PartitionRuleRef; pub type InsertRequestSplit = HashMap; @@ -43,12 +46,77 @@ impl WriteSplitter { check_req(&insert)?; let column_names = self.partition_rule.partition_columns(); - let partition_columns = find_partitioning_values(&insert, &column_names)?; + let values = &insert.columns_values; + let partition_columns = find_partitioning_values(values, &column_names)?; let region_map = self.split_partitioning_values(&partition_columns)?; Ok(split_insert_request(&insert, region_map)) } + pub fn split_delete( + &self, + request: DeleteRequest, + key_column_names: Vec<&String>, + ) -> Result { + Self::validate_delete_request(&request)?; + + let partition_columns = self.partition_rule.partition_columns(); + let values = find_partitioning_values(&request.key_column_values, &partition_columns)?; + let regional_value_indexes = self.split_partitioning_values(&values)?; + + let requests = regional_value_indexes + .into_iter() + .map(|(region_id, value_indexes)| { + let key_column_values = request + .key_column_values + .iter() + .filter(|(column_name, _)| key_column_names.contains(column_name)) + .map(|(column_name, vector)| { + let mut builder = vector + .data_type() + .create_mutable_vector(value_indexes.len()); + + value_indexes.iter().for_each(|&index| { + builder.push_value_ref(vector.get(index).as_value_ref()); + }); + + (column_name.to_string(), builder.to_vector()) + }) + .collect(); + (region_id, DeleteRequest { key_column_values }) + }) + .collect(); + Ok(requests) + } + + fn validate_delete_request(request: &DeleteRequest) -> Result<()> { + let rows = request + .key_column_values + .values() + .next() + .map(|x| x.len()) + .context(InvalidDeleteRequestSnafu { + reason: "no key column values", + })?; + ensure!( + rows > 0, + InvalidDeleteRequestSnafu { + reason: "no rows in delete request" + } + ); + ensure!( + request + .key_column_values + .values() + .map(|x| x.len()) + .all(|x| x == rows), + InvalidDeleteRequestSnafu { + reason: "the lengths of key column values are not the same" + } + ); + Ok(()) + } + fn split_partitioning_values( &self, values: &[VectorRef], @@ -95,11 +163,9 @@ fn check_req(insert: &InsertRequest) -> Result<()> { } fn find_partitioning_values( - insert: &InsertRequest, + values: &HashMap, partition_columns: &[String], ) -> Result> { - let values = &insert.columns_values; - partition_columns .iter() .map(|column_name| { @@ -190,9 +256,7 @@ mod tests { use store_api::storage::RegionNumber; use table::requests::InsertRequest; - use super::{ - check_req, find_partitioning_values, partition_values, split_insert_request, WriteSplitter, - }; + use super::*; use crate::error::Error; use crate::partition::{PartitionExpr, PartitionRule}; use crate::PartitionRuleRef; @@ -331,7 +395,8 @@ mod tests { let insert = mock_insert_request(); let partition_column_names = vec!["host".to_string(), "id".to_string()]; - let columns = find_partitioning_values(&insert, &partition_column_names).unwrap(); + let columns = + find_partitioning_values(&insert.columns_values, &partition_column_names).unwrap(); let host_column = columns[0].clone(); assert_eq!( @@ -461,7 +526,7 @@ mod tests { unreachable!() } - fn find_regions(&self, _: &[PartitionExpr]) -> Result, Error> { + fn find_regions_by_exprs(&self, _: &[PartitionExpr]) -> Result, Error> { unimplemented!() } } diff --git a/src/servers/tests/mod.rs b/src/servers/tests/mod.rs index 338cc04879..811904ce25 100644 --- a/src/servers/tests/mod.rs +++ b/src/servers/tests/mod.rs @@ -155,7 +155,7 @@ impl GrpcQueryHandler for DummyInstance { ctx: QueryContextRef, ) -> std::result::Result { let output = match request { - Request::Insert(_) => unimplemented!(), + Request::Insert(_) | Request::Delete(_) => unimplemented!(), Request::Query(query_request) => { let query = query_request.query.unwrap(); match query { diff --git a/src/table/src/error.rs b/src/table/src/error.rs index 916f77b5bf..70c3758ec7 100644 --- a/src/table/src/error.rs +++ b/src/table/src/error.rs @@ -130,6 +130,12 @@ pub enum Error { table_id: TableId, location: Location, }, + + #[snafu(display("Missing time index column in table: {}", table_name))] + MissingTimeIndexColumn { + table_name: String, + location: Location, + }, } impl ErrorExt for Error { @@ -152,7 +158,10 @@ impl ErrorExt for Error { Error::ParseTableOption { .. } | Error::EngineNotFound { .. } | Error::EngineExist { .. } => StatusCode::InvalidArguments, - Error::InvalidTable { .. } => StatusCode::Internal, + + Error::InvalidTable { .. } | Error::MissingTimeIndexColumn { .. } => { + StatusCode::Internal + } } } diff --git a/tests/cases/standalone/common/delete/delete.result b/tests/cases/standalone/common/delete/delete.result new file mode 100644 index 0000000000..e9e02b2d49 --- /dev/null +++ b/tests/cases/standalone/common/delete/delete.result @@ -0,0 +1,62 @@ +CREATE TABLE monitor (host STRING, ts TIMESTAMP, cpu DOUBLE DEFAULT 0, memory DOUBLE, TIME INDEX (ts), PRIMARY KEY(host)); + +Affected Rows: 0 + +INSERT INTO monitor(ts, host, cpu, memory) VALUES +(1655276557000, 'host1', 66.6, 1024), +(1655276557000, 'host2', 66.6, 1024), +(1655276557000, 'host3', 66.6, 1024), +(1655276558000, 'host1', 77.7, 2048), +(1655276558000, 'host2', 77.7, 2048), +(1655276558000, 'host3', 77.7, 2048), +(1655276559000, 'host1', 88.8, 4096), +(1655276559000, 'host2', 88.8, 4096), +(1655276559000, 'host3', 88.8, 4096); + +Affected Rows: 9 + +SELECT ts, host, cpu, memory FROM monitor ORDER BY ts; + ++---------------------+-------+------+--------+ +| ts | host | cpu | memory | ++---------------------+-------+------+--------+ +| 2022-06-15T07:02:37 | host1 | 66.6 | 1024.0 | +| 2022-06-15T07:02:37 | host2 | 66.6 | 1024.0 | +| 2022-06-15T07:02:37 | host3 | 66.6 | 1024.0 | +| 2022-06-15T07:02:38 | host1 | 77.7 | 2048.0 | +| 2022-06-15T07:02:38 | host2 | 77.7 | 2048.0 | +| 2022-06-15T07:02:38 | host3 | 77.7 | 2048.0 | +| 2022-06-15T07:02:39 | host1 | 88.8 | 4096.0 | +| 2022-06-15T07:02:39 | host2 | 88.8 | 4096.0 | +| 2022-06-15T07:02:39 | host3 | 88.8 | 4096.0 | ++---------------------+-------+------+--------+ + +DELETE FROM monitor WHERE host = 'host1' AND ts = 1655276557000; + +Affected Rows: 1 + +DELETE FROM monitor WHERE host = 'host2'; + +Affected Rows: 3 + +DELETE FROM monitor WHERE cpu = 66.6; + +Affected Rows: 1 + +DELETE FROM monitor WHERE memory > 2048; + +Affected Rows: 2 + +SELECT ts, host, cpu, memory FROM monitor ORDER BY ts; + ++---------------------+-------+------+--------+ +| ts | host | cpu | memory | ++---------------------+-------+------+--------+ +| 2022-06-15T07:02:38 | host1 | 77.7 | 2048.0 | +| 2022-06-15T07:02:38 | host3 | 77.7 | 2048.0 | ++---------------------+-------+------+--------+ + +DROP TABLE monitor; + +Affected Rows: 1 + diff --git a/tests/cases/standalone/common/delete/delete.sql b/tests/cases/standalone/common/delete/delete.sql new file mode 100644 index 0000000000..86986ef2c8 --- /dev/null +++ b/tests/cases/standalone/common/delete/delete.sql @@ -0,0 +1,26 @@ +CREATE TABLE monitor (host STRING, ts TIMESTAMP, cpu DOUBLE DEFAULT 0, memory DOUBLE, TIME INDEX (ts), PRIMARY KEY(host)); + +INSERT INTO monitor(ts, host, cpu, memory) VALUES +(1655276557000, 'host1', 66.6, 1024), +(1655276557000, 'host2', 66.6, 1024), +(1655276557000, 'host3', 66.6, 1024), +(1655276558000, 'host1', 77.7, 2048), +(1655276558000, 'host2', 77.7, 2048), +(1655276558000, 'host3', 77.7, 2048), +(1655276559000, 'host1', 88.8, 4096), +(1655276559000, 'host2', 88.8, 4096), +(1655276559000, 'host3', 88.8, 4096); + +SELECT ts, host, cpu, memory FROM monitor ORDER BY ts; + +DELETE FROM monitor WHERE host = 'host1' AND ts = 1655276557000; + +DELETE FROM monitor WHERE host = 'host2'; + +DELETE FROM monitor WHERE cpu = 66.6; + +DELETE FROM monitor WHERE memory > 2048; + +SELECT ts, host, cpu, memory FROM monitor ORDER BY ts; + +DROP TABLE monitor; diff --git a/tests/cases/standalone/delete/delete.result b/tests/cases/standalone/delete/delete.result deleted file mode 100644 index 8622066d1d..0000000000 --- a/tests/cases/standalone/delete/delete.result +++ /dev/null @@ -1,35 +0,0 @@ -CREATE TABLE monitor ( host STRING, ts TIMESTAMP, cpu DOUBLE DEFAULT 0, memory DOUBLE, TIME INDEX (ts), PRIMARY KEY(host)) ; - -Affected Rows: 0 - -insert into monitor(host, cpu, memory, ts) values ('host1', 66.6, 1024, 1655276557000), ('host2', 77.7, 2048, 1655276558000), ('host3', 88.8, 3072, 1655276559000); - -Affected Rows: 3 - -select * from monitor; - -+-------+---------------------+------+--------+ -| host | ts | cpu | memory | -+-------+---------------------+------+--------+ -| host1 | 2022-06-15T07:02:37 | 66.6 | 1024.0 | -| host2 | 2022-06-15T07:02:38 | 77.7 | 2048.0 | -| host3 | 2022-06-15T07:02:39 | 88.8 | 3072.0 | -+-------+---------------------+------+--------+ - -delete from monitor where host = 'host1' and ts = 1655276557000; - -Affected Rows: 1 - -select * from monitor; - -+-------+---------------------+------+--------+ -| host | ts | cpu | memory | -+-------+---------------------+------+--------+ -| host2 | 2022-06-15T07:02:38 | 77.7 | 2048.0 | -| host3 | 2022-06-15T07:02:39 | 88.8 | 3072.0 | -+-------+---------------------+------+--------+ - -drop table monitor; - -Affected Rows: 1 - diff --git a/tests/cases/standalone/delete/delete.sql b/tests/cases/standalone/delete/delete.sql deleted file mode 100644 index 2eccf144c7..0000000000 --- a/tests/cases/standalone/delete/delete.sql +++ /dev/null @@ -1,11 +0,0 @@ -CREATE TABLE monitor ( host STRING, ts TIMESTAMP, cpu DOUBLE DEFAULT 0, memory DOUBLE, TIME INDEX (ts), PRIMARY KEY(host)) ; - -insert into monitor(host, cpu, memory, ts) values ('host1', 66.6, 1024, 1655276557000), ('host2', 77.7, 2048, 1655276558000), ('host3', 88.8, 3072, 1655276559000); - -select * from monitor; - -delete from monitor where host = 'host1' and ts = 1655276557000; - -select * from monitor; - -drop table monitor; diff --git a/tests/cases/standalone/delete/delete_invalid.result b/tests/cases/standalone/delete/delete_invalid.result deleted file mode 100644 index 64aa607702..0000000000 --- a/tests/cases/standalone/delete/delete_invalid.result +++ /dev/null @@ -1,28 +0,0 @@ -CREATE TABLE monitor ( host STRING, ts TIMESTAMP, cpu DOUBLE DEFAULT 0, memory DOUBLE, TIME INDEX (ts), PRIMARY KEY(host)) ; - -Affected Rows: 0 - -insert into monitor(host, cpu, memory, ts) values ('host1', 66.6, 1024, 1655276557000), ('host2', 77.7, 2048, 1655276558000), ('host3', 88.8, 3072, 1655276559000); - -Affected Rows: 3 - -delete from monitor where cpu = 66.6 and ts = 1655276557000; - -Affected Rows: 1 - -delete from monitor where host = 'host1' or ts = 1655276557000; - -Affected Rows: 0 - -delete from monitor where host = 'host1' or ts != 1655276557000; - -Affected Rows: 2 - -delete from monitor where ts != 1655276557000; - -Affected Rows: 0 - -drop table monitor; - -Affected Rows: 1 - diff --git a/tests/cases/standalone/delete/delete_invalid.sql b/tests/cases/standalone/delete/delete_invalid.sql deleted file mode 100644 index 68d7ccf2c3..0000000000 --- a/tests/cases/standalone/delete/delete_invalid.sql +++ /dev/null @@ -1,13 +0,0 @@ -CREATE TABLE monitor ( host STRING, ts TIMESTAMP, cpu DOUBLE DEFAULT 0, memory DOUBLE, TIME INDEX (ts), PRIMARY KEY(host)) ; - -insert into monitor(host, cpu, memory, ts) values ('host1', 66.6, 1024, 1655276557000), ('host2', 77.7, 2048, 1655276558000), ('host3', 88.8, 3072, 1655276559000); - -delete from monitor where cpu = 66.6 and ts = 1655276557000; - -delete from monitor where host = 'host1' or ts = 1655276557000; - -delete from monitor where host = 'host1' or ts != 1655276557000; - -delete from monitor where ts != 1655276557000; - -drop table monitor;