diff --git a/src/datanode/src/instance/grpc.rs b/src/datanode/src/instance/grpc.rs index 3a8e97ffa0..8606429434 100644 --- a/src/datanode/src/instance/grpc.rs +++ b/src/datanode/src/instance/grpc.rs @@ -305,9 +305,12 @@ mod test { use api::v1::column::{SemanticType, Values}; use api::v1::{ alter_expr, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef, - CreateDatabaseExpr, CreateTableExpr, InsertRequest, InsertRequests, QueryRequest, + CreateDatabaseExpr, CreateTableExpr, DropTableExpr, InsertRequest, InsertRequests, + QueryRequest, TableId, }; use common_catalog::consts::MITO_ENGINE; + use common_error::ext::ErrorExt; + use common_error::status_code::StatusCode; use common_recordbatch::RecordBatches; use datatypes::prelude::*; use query::parser::QueryLanguageParser; @@ -327,6 +330,230 @@ mod test { engine.execute(plan, QueryContext::arc()).await.unwrap() } + #[tokio::test(flavor = "multi_thread")] + async fn test_create_same_table_twice() { + // It should return TableAlreadyExists(4000) + let instance = MockInstance::new("test_create_same_table_twice").await; + let instance = instance.inner(); + + let query = Request::Ddl(DdlRequest { + expr: Some(DdlExpr::CreateDatabase(CreateDatabaseExpr { + database_name: "my_database".to_string(), + create_if_not_exists: true, + })), + }); + let output = instance.do_query(query, QueryContext::arc()).await.unwrap(); + assert!(matches!(output, Output::AffectedRows(1))); + + let query = Request::Ddl(DdlRequest { + expr: Some(DdlExpr::CreateTable(CreateTableExpr { + catalog_name: "greptime".to_string(), + schema_name: "my_database".to_string(), + table_name: "my_table".to_string(), + desc: "blabla".to_string(), + column_defs: vec![ + ColumnDef { + name: "a".to_string(), + datatype: ColumnDataType::String as i32, + is_nullable: true, + default_constraint: vec![], + }, + ColumnDef { + name: "ts".to_string(), + datatype: ColumnDataType::TimestampMillisecond as i32, + is_nullable: false, + default_constraint: vec![], + }, + ], + time_index: "ts".to_string(), + engine: MITO_ENGINE.to_string(), + ..Default::default() + })), + }); + let output = instance + .do_query(query.clone(), QueryContext::arc()) + .await + .unwrap(); + assert!(matches!(output, Output::AffectedRows(0))); + + let err = instance + .do_query(query.clone(), QueryContext::arc()) + .await + .unwrap_err(); + assert!(matches!(err.status_code(), StatusCode::TableAlreadyExists)); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_drop_same_table_twice() { + // It should return TableNotFound(4001) + let instance = MockInstance::new("test_drop_same_table_twice").await; + let instance = instance.inner(); + + let query = Request::Ddl(DdlRequest { + expr: Some(DdlExpr::CreateDatabase(CreateDatabaseExpr { + database_name: "my_database".to_string(), + create_if_not_exists: true, + })), + }); + let output = instance.do_query(query, QueryContext::arc()).await.unwrap(); + assert!(matches!(output, Output::AffectedRows(1))); + + let query = Request::Ddl(DdlRequest { + expr: Some(DdlExpr::CreateTable(CreateTableExpr { + catalog_name: "greptime".to_string(), + schema_name: "my_database".to_string(), + table_name: "my_table".to_string(), + desc: "blabla".to_string(), + column_defs: vec![ + ColumnDef { + name: "a".to_string(), + datatype: ColumnDataType::String as i32, + is_nullable: true, + default_constraint: vec![], + }, + ColumnDef { + name: "ts".to_string(), + datatype: ColumnDataType::TimestampMillisecond as i32, + is_nullable: false, + default_constraint: vec![], + }, + ], + time_index: "ts".to_string(), + engine: MITO_ENGINE.to_string(), + ..Default::default() + })), + }); + let output = instance + .do_query(query.clone(), QueryContext::arc()) + .await + .unwrap(); + assert!(matches!(output, Output::AffectedRows(0))); + + let query = Request::Ddl(DdlRequest { + expr: Some(DdlExpr::DropTable(DropTableExpr { + catalog_name: "greptime".to_string(), + schema_name: "my_database".to_string(), + table_name: "my_table".to_string(), + table_id: Some(TableId { id: 1025 }), + })), + }); + + let output = instance + .do_query(query.clone(), QueryContext::arc()) + .await + .unwrap(); + assert!(matches!(output, Output::AffectedRows(1))); + + let err = instance + .do_query(query.clone(), QueryContext::arc()) + .await + .unwrap_err(); + assert!(matches!(err.status_code(), StatusCode::TableNotFound)); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_alter_table_twice() { + let instance = MockInstance::new("test_alter_table_twice").await; + let instance = instance.inner(); + + let query = Request::Ddl(DdlRequest { + expr: Some(DdlExpr::CreateDatabase(CreateDatabaseExpr { + database_name: "my_database".to_string(), + create_if_not_exists: true, + })), + }); + let output = instance.do_query(query, QueryContext::arc()).await.unwrap(); + assert!(matches!(output, Output::AffectedRows(1))); + + let query = Request::Ddl(DdlRequest { + expr: Some(DdlExpr::CreateTable(CreateTableExpr { + catalog_name: "greptime".to_string(), + schema_name: "my_database".to_string(), + table_name: "my_table".to_string(), + desc: "blabla".to_string(), + column_defs: vec![ + ColumnDef { + name: "a".to_string(), + datatype: ColumnDataType::String as i32, + is_nullable: true, + default_constraint: vec![], + }, + ColumnDef { + name: "ts".to_string(), + datatype: ColumnDataType::TimestampMillisecond as i32, + is_nullable: false, + default_constraint: vec![], + }, + ], + time_index: "ts".to_string(), + engine: MITO_ENGINE.to_string(), + ..Default::default() + })), + }); + let output = instance + .do_query(query.clone(), QueryContext::arc()) + .await + .unwrap(); + assert!(matches!(output, Output::AffectedRows(0))); + + let query = Request::Ddl(DdlRequest { + expr: Some(DdlExpr::Alter(AlterExpr { + catalog_name: "greptime".to_string(), + schema_name: "my_database".to_string(), + table_name: "my_table".to_string(), + kind: Some(alter_expr::Kind::AddColumns(AddColumns { + add_columns: vec![AddColumn { + column_def: Some(ColumnDef { + name: "b".to_string(), + datatype: ColumnDataType::Int32 as i32, + is_nullable: true, + default_constraint: vec![], + }), + is_key: true, + location: None, + }], + })), + ..Default::default() + })), + }); + let output = instance + .do_query(query.clone(), QueryContext::arc()) + .await + .unwrap(); + assert!(matches!(output, Output::AffectedRows(0))); + + let output = instance.do_query(query, QueryContext::arc()).await.unwrap(); + assert!(matches!(output, Output::AffectedRows(0))); + + // Updates `table_version` to latest. + let query = Request::Ddl(DdlRequest { + expr: Some(DdlExpr::Alter(AlterExpr { + catalog_name: "greptime".to_string(), + schema_name: "my_database".to_string(), + table_name: "my_table".to_string(), + kind: Some(alter_expr::Kind::AddColumns(AddColumns { + add_columns: vec![AddColumn { + column_def: Some(ColumnDef { + name: "b".to_string(), + datatype: ColumnDataType::Int32 as i32, + is_nullable: true, + default_constraint: vec![], + }), + is_key: true, + location: None, + }], + })), + table_version: 1, + ..Default::default() + })), + }); + let err = instance + .do_query(query, QueryContext::arc()) + .await + .unwrap_err(); + assert_eq!(err.status_code(), StatusCode::TableColumnExists); + } + #[tokio::test(flavor = "multi_thread")] async fn test_handle_ddl() { let instance = MockInstance::new("test_handle_ddl").await;