mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-21 23:40:38 +00:00
feat: Support the DELETE SQL statement (#942)
* [WIP]:delete sql * [fix]:time parser bug * [fix]:resolve conflict * [fmt]:cargo fmt * [fix]:remove unless log * [fix]:test * [feat]:add error parse * [fix]:resolve conflict * [fix]:remove unless code * [fix]:remove unless code * [test]:add IT * [fix]:add license * [fix]:ci * [fix]:ci * [fix]:ci * [fix]:remove * [fix]:ci * [feat]:add sql * [fix]:modify sql * [feat]:refactor parser_expr * [feat]:rm backtrace * [fix]:ci * [fix]: conversation * [fix]: conversation * feat:refactor delete * feat:refactor delete * fix:resolve conversation * fix:ut * fix:ut * fix:conversation * fix:conversation * fix:conservation --------- Co-authored-by: xieqijun <qijun@apache.org>
This commit is contained in:
@@ -114,6 +114,17 @@ pub enum Error {
|
||||
source: TableError,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Failed to delete value from table: {}, source: {}",
|
||||
table_name,
|
||||
source
|
||||
))]
|
||||
Delete {
|
||||
table_name: String,
|
||||
#[snafu(backtrace)]
|
||||
source: TableError,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to start server, source: {}", source))]
|
||||
StartServer {
|
||||
#[snafu(backtrace)]
|
||||
@@ -161,7 +172,10 @@ pub enum Error {
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid SQL, error: {}", msg))]
|
||||
InvalidSql { msg: String, backtrace: Backtrace },
|
||||
InvalidSql { msg: String },
|
||||
|
||||
#[snafu(display("Not support SQL, error: {}", msg))]
|
||||
NotSupportSql { msg: String },
|
||||
|
||||
#[snafu(display("Failed to create schema when creating table, source: {}", source))]
|
||||
CreateSchema {
|
||||
@@ -343,6 +357,7 @@ impl ErrorExt for Error {
|
||||
Error::DropTable { source, .. } => source.status_code(),
|
||||
|
||||
Error::Insert { source, .. } => source.status_code(),
|
||||
Error::Delete { source, .. } => source.status_code(),
|
||||
|
||||
Error::TableNotFound { .. } => StatusCode::TableNotFound,
|
||||
Error::ColumnNotFound { .. } => StatusCode::TableColumnNotFound,
|
||||
@@ -361,6 +376,7 @@ impl ErrorExt for Error {
|
||||
|
||||
Error::ColumnValuesNumberMismatch { .. }
|
||||
| Error::InvalidSql { .. }
|
||||
| Error::NotSupportSql { .. }
|
||||
| Error::KeyColumnNotFound { .. }
|
||||
| Error::InvalidPrimaryKey { .. }
|
||||
| Error::MissingTimestampColumn { .. }
|
||||
|
||||
@@ -66,7 +66,10 @@ impl Instance {
|
||||
)?;
|
||||
self.sql_handler.execute(request, query_ctx).await
|
||||
}
|
||||
|
||||
QueryStatement::Sql(Statement::Delete(d)) => {
|
||||
let request = SqlRequest::Delete(*d);
|
||||
self.sql_handler.execute(request, query_ctx).await
|
||||
}
|
||||
QueryStatement::Sql(Statement::CreateDatabase(c)) => {
|
||||
let request = CreateDatabaseRequest {
|
||||
db_name: c.name.to_string(),
|
||||
|
||||
@@ -19,17 +19,20 @@ use query::query_engine::QueryEngineRef;
|
||||
use query::sql::{describe_table, explain, show_databases, show_tables};
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use sql::statements::delete::Delete;
|
||||
use sql::statements::describe::DescribeTable;
|
||||
use sql::statements::explain::Explain;
|
||||
use sql::statements::show::{ShowDatabases, ShowTables};
|
||||
use table::engine::TableEngineRef;
|
||||
use table::engine::{EngineContext, TableEngineRef, TableReference};
|
||||
use table::requests::*;
|
||||
use table::TableRef;
|
||||
|
||||
use crate::error::{self, ExecuteSqlSnafu, Result, TableNotFoundSnafu};
|
||||
use crate::error::{self, ExecuteSqlSnafu, GetTableSnafu, Result, TableNotFoundSnafu};
|
||||
use crate::instance::sql::table_idents_to_full_name;
|
||||
|
||||
mod alter;
|
||||
mod create;
|
||||
mod delete;
|
||||
mod drop_table;
|
||||
mod insert;
|
||||
|
||||
@@ -44,6 +47,7 @@ pub enum SqlRequest {
|
||||
ShowTables(ShowTables),
|
||||
DescribeTable(DescribeTable),
|
||||
Explain(Box<Explain>),
|
||||
Delete(Delete),
|
||||
}
|
||||
|
||||
// Handler to execute SQL except query
|
||||
@@ -77,6 +81,7 @@ impl SqlHandler {
|
||||
SqlRequest::CreateDatabase(req) => self.create_database(req).await,
|
||||
SqlRequest::Alter(req) => self.alter(req).await,
|
||||
SqlRequest::DropTable(req) => self.drop_table(req).await,
|
||||
SqlRequest::Delete(stmt) => self.delete(query_ctx.clone(), stmt).await,
|
||||
SqlRequest::ShowDatabases(stmt) => {
|
||||
show_databases(stmt, self.catalog_manager.clone()).context(ExecuteSqlSnafu)
|
||||
}
|
||||
@@ -108,6 +113,17 @@ impl SqlHandler {
|
||||
result
|
||||
}
|
||||
|
||||
pub(crate) fn get_table(&self, table_ref: &TableReference) -> Result<TableRef> {
|
||||
self.table_engine
|
||||
.get_table(&EngineContext::default(), table_ref)
|
||||
.with_context(|_| GetTableSnafu {
|
||||
table_name: table_ref.to_string(),
|
||||
})?
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
table_name: table_ref.to_string(),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn table_engine(&self) -> TableEngineRef {
|
||||
self.table_engine.clone()
|
||||
}
|
||||
|
||||
142
src/datanode/src/sql/delete.rs
Normal file
142
src/datanode/src/sql/delete.rs
Normal file
@@ -0,0 +1,142 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_query::Output;
|
||||
use datatypes::data_type::DataType;
|
||||
use datatypes::prelude::VectorRef;
|
||||
use datatypes::vectors::StringVector;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use sql::ast::{BinaryOperator, Expr, Value};
|
||||
use sql::statements::delete::Delete;
|
||||
use sql::statements::sql_value_to_value;
|
||||
use table::engine::TableReference;
|
||||
use table::requests::DeleteRequest;
|
||||
use table::TableRef;
|
||||
|
||||
use crate::error::{ColumnNotFoundSnafu, DeleteSnafu, InvalidSqlSnafu, NotSupportSqlSnafu, Result};
|
||||
use crate::instance::sql::table_idents_to_full_name;
|
||||
use crate::sql::SqlHandler;
|
||||
|
||||
impl SqlHandler {
|
||||
pub(crate) async fn delete(&self, query_ctx: QueryContextRef, stmt: Delete) -> Result<Output> {
|
||||
let (catalog_name, schema_name, table_name) =
|
||||
table_idents_to_full_name(stmt.table_name(), query_ctx)?;
|
||||
let table_ref = TableReference {
|
||||
catalog: &catalog_name.to_string(),
|
||||
schema: &schema_name.to_string(),
|
||||
table: &table_name.to_string(),
|
||||
};
|
||||
|
||||
let table = self.get_table(&table_ref)?;
|
||||
|
||||
let req = DeleteRequest {
|
||||
key_column_values: parse_selection(stmt.selection(), &table)?,
|
||||
};
|
||||
|
||||
let affected_rows = table.delete(req).await.with_context(|_| DeleteSnafu {
|
||||
table_name: table_ref.to_string(),
|
||||
})?;
|
||||
|
||||
Ok(Output::AffectedRows(affected_rows))
|
||||
}
|
||||
}
|
||||
|
||||
/// parse selection, currently supported format is `tagkey1 = 'tagvalue1' and 'ts' = 'value'`.
|
||||
/// (only uses =, and in the where clause and provides all columns needed by the key.)
|
||||
fn parse_selection(
|
||||
selection: &Option<Expr>,
|
||||
table: &TableRef,
|
||||
) -> Result<HashMap<String, VectorRef>> {
|
||||
let mut key_column_values = HashMap::new();
|
||||
if let Some(expr) = selection {
|
||||
parse_expr(expr, &mut key_column_values, table)?;
|
||||
}
|
||||
Ok(key_column_values)
|
||||
}
|
||||
|
||||
fn parse_expr(
|
||||
expr: &Expr,
|
||||
key_column_values: &mut HashMap<String, VectorRef>,
|
||||
table: &TableRef,
|
||||
) -> Result<()> {
|
||||
// match BinaryOp
|
||||
if let Expr::BinaryOp { left, op, right } = expr {
|
||||
match (&**left, op, &**right) {
|
||||
// match And operator
|
||||
(Expr::BinaryOp { .. }, BinaryOperator::And, Expr::BinaryOp { .. }) => {
|
||||
parse_expr(left, key_column_values, table)?;
|
||||
parse_expr(right, key_column_values, table)?;
|
||||
return Ok(());
|
||||
}
|
||||
// match Eq operator
|
||||
(Expr::Identifier(column_name), BinaryOperator::Eq, Expr::Value(value)) => {
|
||||
key_column_values.insert(
|
||||
column_name.to_string(),
|
||||
value_to_vector(&column_name.to_string(), value, table)?,
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
(Expr::Identifier(column_name), BinaryOperator::Eq, Expr::Identifier(value)) => {
|
||||
key_column_values.insert(
|
||||
column_name.to_string(),
|
||||
Arc::new(StringVector::from(vec![value.to_string()])),
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
NotSupportSqlSnafu {
|
||||
msg: format!(
|
||||
"Not support sql expr:{expr},correct format is tagkey1 = tagvalue1 and ts = value"
|
||||
),
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
|
||||
/// parse value to vector
|
||||
fn value_to_vector(column_name: &String, sql_value: &Value, table: &TableRef) -> Result<VectorRef> {
|
||||
let schema = table.schema();
|
||||
let column_schema =
|
||||
schema
|
||||
.column_schema_by_name(column_name)
|
||||
.with_context(|| ColumnNotFoundSnafu {
|
||||
table_name: table.table_info().name.clone(),
|
||||
column_name: column_name.to_string(),
|
||||
})?;
|
||||
let data_type = &column_schema.data_type;
|
||||
let value = sql_value_to_value(column_name, data_type, sql_value);
|
||||
match value {
|
||||
Ok(value) => {
|
||||
let mut vec = data_type.create_mutable_vector(1);
|
||||
if vec.push_value_ref(value.as_value_ref()).is_err() {
|
||||
return InvalidSqlSnafu {
|
||||
msg: format!(
|
||||
"invalid sql, column name is {column_name}, value is {sql_value}",
|
||||
),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
Ok(vec.to_vector())
|
||||
}
|
||||
_ => InvalidSqlSnafu {
|
||||
msg: format!("invalid sql, column name is {column_name}, value is {sql_value}",),
|
||||
}
|
||||
.fail(),
|
||||
}
|
||||
}
|
||||
@@ -26,8 +26,8 @@ use table::requests::*;
|
||||
|
||||
use crate::error::{
|
||||
CatalogSnafu, ColumnDefaultValueSnafu, ColumnNoneDefaultValueSnafu, ColumnNotFoundSnafu,
|
||||
ColumnValuesNumberMismatchSnafu, FindTableSnafu, InsertSnafu, ParseSqlSnafu,
|
||||
ParseSqlValueSnafu, Result, TableNotFoundSnafu,
|
||||
ColumnValuesNumberMismatchSnafu, InsertSnafu, ParseSqlSnafu, ParseSqlValueSnafu, Result,
|
||||
TableNotFoundSnafu,
|
||||
};
|
||||
use crate::sql::{SqlHandler, SqlRequest};
|
||||
|
||||
@@ -43,15 +43,7 @@ impl SqlHandler {
|
||||
table: &req.table_name.to_string(),
|
||||
};
|
||||
|
||||
let table = self
|
||||
.catalog_manager
|
||||
.table(table_ref.catalog, table_ref.schema, table_ref.table)
|
||||
.context(FindTableSnafu {
|
||||
table_name: table_ref.to_string(),
|
||||
})?
|
||||
.context(TableNotFoundSnafu {
|
||||
table_name: table_ref.to_string(),
|
||||
})?;
|
||||
let table = self.get_table(&table_ref)?;
|
||||
|
||||
let affected_rows = table.insert(req).await.with_context(|_| InsertSnafu {
|
||||
table_name: table_ref.to_string(),
|
||||
|
||||
@@ -654,6 +654,55 @@ async fn test_use_database() {
|
||||
check_output_stream(output, expected).await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_delete() {
|
||||
let instance = MockInstance::new("test_delete").await;
|
||||
|
||||
let output = execute_sql(
|
||||
&instance,
|
||||
r#"create table test_table(
|
||||
host string,
|
||||
ts timestamp,
|
||||
cpu double default 0,
|
||||
memory double,
|
||||
TIME INDEX (ts),
|
||||
PRIMARY KEY(host)
|
||||
) engine=mito with(regions=1);"#,
|
||||
)
|
||||
.await;
|
||||
assert!(matches!(output, Output::AffectedRows(0)));
|
||||
|
||||
let output = execute_sql(
|
||||
&instance,
|
||||
r#"insert into test_table(host, cpu, memory, ts) values
|
||||
('host1', 66.6, 1024, 1655276557000),
|
||||
('host2', 77.7, 2048, 1655276558000),
|
||||
('host3', 88.8, 3072, 1655276559000)
|
||||
"#,
|
||||
)
|
||||
.await;
|
||||
assert!(matches!(output, Output::AffectedRows(3)));
|
||||
|
||||
let output = execute_sql(
|
||||
&instance,
|
||||
"delete from test_table where host = host1 and ts = 1655276557000 ",
|
||||
)
|
||||
.await;
|
||||
assert!(matches!(output, Output::AffectedRows(1)));
|
||||
|
||||
let output = execute_sql(&instance, "select * from test_table").await;
|
||||
let expect = "\
|
||||
+-------+---------------------+------+--------+
|
||||
| host | ts | cpu | memory |
|
||||
+-------+---------------------+------+--------+
|
||||
| host2 | 2022-06-15T07:02:38 | 77.7 | 2048 |
|
||||
| host3 | 2022-06-15T07:02:39 | 88.8 | 3072 |
|
||||
+-------+---------------------+------+--------+\
|
||||
"
|
||||
.to_string();
|
||||
check_output_stream(output, expect).await;
|
||||
}
|
||||
|
||||
async fn execute_sql(instance: &MockInstance, sql: &str) -> Output {
|
||||
execute_sql_in_db(instance, sql, DEFAULT_SCHEMA_NAME).await
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user