From 2e2a82689cb6daa8c60c1b6472a1df38d06415d2 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Wed, 19 Jul 2023 17:26:13 +0900 Subject: [PATCH] fix: alter procedure table not found issue (#1993) * fix: alter procedure table not found issue * chore: apply suggestions * chore: apply suggestions from CR --- src/datanode/src/error.rs | 8 +++- src/datanode/src/instance/grpc.rs | 75 ++++++++++++++++++++++++++++++- src/datanode/src/server/grpc.rs | 34 ++++++++------ src/datanode/src/sql/alter.rs | 61 ++++++++++++++++++------- 4 files changed, 146 insertions(+), 32 deletions(-) diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 7b5fc5c6e5..8db4b8b552 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -476,6 +476,12 @@ pub enum Error { source: common_runtime::JoinError, location: Location, }, + + #[snafu(display("Unexpected, violated: {}", violated))] + Unexpected { + violated: String, + location: Location, + }, } pub type Result = std::result::Result; @@ -530,7 +536,7 @@ impl ErrorExt for Error { | MissingWalDirConfig { .. } | PrepareImmutableTable { .. } => StatusCode::InvalidArguments, - EncodeJson { .. } | DecodeJson { .. } | PayloadNotExist { .. } => { + EncodeJson { .. } | DecodeJson { .. } | PayloadNotExist { .. } | Unexpected { .. } => { StatusCode::Unexpected } diff --git a/src/datanode/src/instance/grpc.rs b/src/datanode/src/instance/grpc.rs index 8606429434..79fcba5dfd 100644 --- a/src/datanode/src/instance/grpc.rs +++ b/src/datanode/src/instance/grpc.rs @@ -306,7 +306,7 @@ mod test { use api::v1::{ alter_expr, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef, CreateDatabaseExpr, CreateTableExpr, DropTableExpr, InsertRequest, InsertRequests, - QueryRequest, TableId, + QueryRequest, RenameTable, TableId, }; use common_catalog::consts::MITO_ENGINE; use common_error::ext::ErrorExt; @@ -554,6 +554,79 @@ mod test { assert_eq!(err.status_code(), StatusCode::TableColumnExists); } + #[tokio::test(flavor = "multi_thread")] + async fn test_rename_table_twice() { + common_telemetry::init_default_ut_logging(); + let instance = MockInstance::new("test_alter_table_twice").await; + let instance = instance.inner(); + + let query = Request::Ddl(DdlRequest { + expr: Some(DdlExpr::CreateDatabase(CreateDatabaseExpr { + database_name: "my_database".to_string(), + create_if_not_exists: true, + })), + }); + let output = instance.do_query(query, QueryContext::arc()).await.unwrap(); + assert!(matches!(output, Output::AffectedRows(1))); + + let query = Request::Ddl(DdlRequest { + expr: Some(DdlExpr::CreateTable(CreateTableExpr { + catalog_name: "greptime".to_string(), + schema_name: "my_database".to_string(), + table_name: "my_table".to_string(), + desc: "blabla".to_string(), + column_defs: vec![ + ColumnDef { + name: "a".to_string(), + datatype: ColumnDataType::String as i32, + is_nullable: true, + default_constraint: vec![], + }, + ColumnDef { + name: "ts".to_string(), + datatype: ColumnDataType::TimestampMillisecond as i32, + is_nullable: false, + default_constraint: vec![], + }, + ], + time_index: "ts".to_string(), + engine: MITO_ENGINE.to_string(), + table_id: Some(TableId { id: 1025 }), + ..Default::default() + })), + }); + let output = instance + .do_query(query.clone(), QueryContext::arc()) + .await + .unwrap(); + assert!(matches!(output, Output::AffectedRows(0))); + + let query = Request::Ddl(DdlRequest { + expr: Some(DdlExpr::Alter(AlterExpr { + catalog_name: "greptime".to_string(), + schema_name: "my_database".to_string(), + table_name: "my_table".to_string(), + kind: Some(alter_expr::Kind::RenameTable(RenameTable { + new_table_name: "new_my_table".to_string(), + })), + table_id: Some(TableId { id: 1025 }), + ..Default::default() + })), + }); + let output = instance + .do_query(query.clone(), QueryContext::arc()) + .await + .unwrap(); + assert!(matches!(output, Output::AffectedRows(0))); + + // renames it again. + let output = instance + .do_query(query.clone(), QueryContext::arc()) + .await + .unwrap(); + assert!(matches!(output, Output::AffectedRows(0))); + } + #[tokio::test(flavor = "multi_thread")] async fn test_handle_ddl() { let instance = MockInstance::new("test_handle_ddl").await; diff --git a/src/datanode/src/server/grpc.rs b/src/datanode/src/server/grpc.rs index 59615964f2..19d6b60ecb 100644 --- a/src/datanode/src/server/grpc.rs +++ b/src/datanode/src/server/grpc.rs @@ -70,21 +70,27 @@ impl Instance { } pub(crate) async fn handle_alter(&self, expr: AlterExpr) -> Result { - let table = self - .catalog_manager - .table(&expr.catalog_name, &expr.schema_name, &expr.table_name) - .await - .context(CatalogSnafu)? - .with_context(|| TableNotFoundSnafu { - table_name: format_full_table_name( - &expr.catalog_name, - &expr.schema_name, - &expr.table_name, - ), - })?; + let table_id = match expr.table_id.as_ref() { + None => { + self.catalog_manager + .table(&expr.catalog_name, &expr.schema_name, &expr.table_name) + .await + .context(CatalogSnafu)? + .with_context(|| TableNotFoundSnafu { + table_name: format_full_table_name( + &expr.catalog_name, + &expr.schema_name, + &expr.table_name, + ), + })? + .table_info() + .ident + .table_id + } + Some(table_id) => table_id.id, // For requests from Metasrv. + }; - let request = alter_expr_to_request(table.table_info().ident.table_id, expr) - .context(AlterExprToRequestSnafu)?; + let request = alter_expr_to_request(table_id, expr).context(AlterExprToRequestSnafu)?; self.sql_handler() .execute(SqlRequest::Alter(request), QueryContext::arc()) .await diff --git a/src/datanode/src/sql/alter.rs b/src/datanode/src/sql/alter.rs index c6db4e6eeb..efe1084a8b 100644 --- a/src/datanode/src/sql/alter.rs +++ b/src/datanode/src/sql/alter.rs @@ -23,7 +23,7 @@ use table::metadata::TableId; use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest}; use table_procedure::AlterTableProcedure; -use crate::error::{self, Result}; +use crate::error::{self, Result, UnexpectedSnafu}; use crate::sql::SqlHandler; impl SqlHandler { @@ -35,25 +35,54 @@ impl SqlHandler { table: &table_name, }; - let table = self.get_table(&table_ref).await?; - let engine_procedure = self.engine_procedure(table)?; + match self.get_table(&table_ref).await { + Ok(table) => { + let engine_procedure = self.engine_procedure(table)?; - let procedure = - AlterTableProcedure::new(req, self.catalog_manager.clone(), engine_procedure); - let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); - let procedure_id = procedure_with_id.id; + let procedure = + AlterTableProcedure::new(req, self.catalog_manager.clone(), engine_procedure); + let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); + let procedure_id = procedure_with_id.id; - info!("Alter table {} by procedure {}", table_name, procedure_id); + info!("Alter table {} by procedure {}", table_name, procedure_id); - let mut watcher = self - .procedure_manager - .submit(procedure_with_id) - .await - .context(error::SubmitProcedureSnafu { procedure_id })?; + let mut watcher = self + .procedure_manager + .submit(procedure_with_id) + .await + .context(error::SubmitProcedureSnafu { procedure_id })?; + + watcher::wait(&mut watcher) + .await + .context(error::WaitProcedureSnafu { procedure_id })?; + } + Err(err) => { + // TODO(weny): Retrieves table by table_id + if let AlterKind::RenameTable { new_table_name } = req.alter_kind { + let new_table_ref = TableReference { + catalog: &req.catalog_name, + schema: &req.schema_name, + table: &new_table_name, + }; + + let table = self.get_table(&new_table_ref).await?; + + ensure!( + table.table_info().table_id() == req.table_id, + UnexpectedSnafu { + violated: format!( + "expected table id: {}, actual: {}", + req.table_id, + table.table_info().table_id() + ) + } + ) + } else { + return Err(err); + } + } + } - watcher::wait(&mut watcher) - .await - .context(error::WaitProcedureSnafu { procedure_id })?; // Tried in MySQL, it really prints "Affected Rows: 0". Ok(Output::AffectedRows(0)) }